当前位置:编程学习 > JAVA >>

并行计算框架的Java实现--系列三

优化锁,之前的锁是采用一个static的Object实现的,这要就会有一个问题,如果我创建了多个Executer,那么所有Job都会持有一把锁,既影响性能,也容易出现死锁的情况。所以,改成每个Executer持有一把锁。
Executer代码如下:
 
Java代码 
public class Executer { 
    //计算已经派发的任务数(条件谓词) 
    public static int THREAD_COUNT = 0; 
    //存储任务的执行结果 
    private List<Future<Object>> futres = new ArrayList<Future<Object>>();  
    //条件队列锁 
    public final Object lock = new Object(); 
    //线程池 
    private ExecutorService pool = null; 
    public Executer() { 
        this(1); 
    } 
    public Executer(int threadPoolSize) { 
        pool = Executors.newFixedThreadPool(threadPoolSize); 
    } 
    /**
     * 任务派发
     * @param job
     */ 
    public void fork(Job job){ 
        //设置同步锁 
        job.setLock(lock); 
        //将任务派发给线程池去执行 
        futres.add(pool.submit(job)); 
        //增加线程数 
        synchronized (lock) { 
            THREAD_COUNT++; 
        } 
    } 
    /**
     * 统计任务结果
     */ 
    public List<Object> join(){ 
        synchronized (lock) { 
            while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成 
                System.out.println("threadCount: "+THREAD_COUNT); 
                try { 
                    lock.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
        List<Object> list = new ArrayList<Object>(); 
        //取出每个任务的处理结果,汇总后返回 
        for (Future<Object> future : futres) { 
            try { 
                Object result = future.get();//因为任务都已经完成,这里直接get 
                list.add(result); 
            } catch (Exception e) { 
                e.printStackTrace(); 
            }  
        } 
        return list; 
    } 

 Job类:
 
Java代码 
public abstract class Job implements Callable<Object> { 
 
    //锁 
    private Object lock = null; 
 
    void setLock(Object lock) { 
        this.lock = lock; 
    } 
 
    @Override 
    public Object call() throws Exception { 
        Object result = this.execute();//执行子类具体任务 
        synchronized (lock) { 
            //处理完业务后,任务结束,递减线程数,同时唤醒主线程 
            Executer.THREAD_COUNT--; 
            lock.notifyAll(); 
        } 
        return result; 
    } 
    /**
     * 业务处理函数
     */ 
    public abstract Object execute(); 
     

 测试结果:
 
Java代码 
threadCount: 10 
running thread id = 8 
running thread id = 10 
running thread id = 9 
running thread id = 12 
running thread id = 11 
threadCount: 8 
threadCount: 7 
threadCount: 6 
threadCount: 5 
running thread id = 12 
running thread id = 8 
running thread id = 11 
threadCount: 2 
running thread id = 10 
threadCount: 1 
running thread id = 9 
ResultSize: 10 
time: 2001 
补充:软件开发 , Java ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,