当前位置:编程学习 > JAVA >>

并行计算框架的Java实现--系列一

最近的工作需要统计一些复杂的报表,为了提高效率,想用多线程去实现,但要在所有线程完成统计任务后,将结果汇总。所以在思考有没有什么办法解决,之所以是“系列一”是因为我想记录下我的思考过程。
1、首先设计一个Executer,负责任务的执行和汇总:
Java代码 
public class Executer { 
    //计算已经派发的任务数(条件谓词) 
    public static int THREAD_COUNT = 0; 
    //线程池 
    private Executor pool = null; 
    public Executer() { 
        this(1); 
    } 
    public Executer(int threadPoolSize) { 
        pool = Executors.newFixedThreadPool(threadPoolSize); 
    } 
    /**
     * 任务派发
     * @param job
     */ 
    public void fork(Job job){ 
        //将任务派发给线程池去执行 
        pool.execute(job); 
        THREAD_COUNT++; 
    } 
    /**
     * 统计任务结果
     */ 
    public void join(){ 
        while(THREAD_COUNT > 0){ 
            System.out.println("threadCount: "+THREAD_COUNT); 
            try { 
                wait();//如果任务没有全部完成,则挂起 
            } catch (Exception e) {}//这里总是抛异常,不知道为什么,好吧!先不管它 
        } 
    } 

 2、写一个抽象的Job类,负责执行具体的任务
Java代码 
public abstract class Job implements Runnable { 
 
    @Override 
    public void run() { 
        this.execute();//执行子类具体任务 
        Executer.THREAD_COUNT--; 
        try{ 
            notifyAll();//这里总是抛异常,不知道为什么,好吧!先不管它 
        }catch(Exception e){} 
    } 
    /**
     * 业务处理函数
     */ 
    public abstract void execute(); 
 

 
3、测试,先来一个具体的任务实现。
Java代码 
public class MyJob extends Job { 
 
    @Override 
    public void execute() { 
        //模拟业务需要处理1秒. 
        try {Thread.sleep(1000);} catch (InterruptedException e) {} 
        System.out.println("running thread id = "+Thread.currentThread().getId()); 
    } 
 

 
4、测试。
Java代码 
public class Test { 
    public static void main(String[] args) { 
        //初始化任务池 
        Executer exe = new Executer(5); 
        //初始化任务 
        long time = System.currentTimeMillis(); 
        for (int i = 0; i < 10; i++) { 
            MyJob job = new MyJob(); 
            exe.fork(job);//派发任务 
        } 
        //汇总任务结果 
        exe.join(); 
        System.out.println("time: "+(System.currentTimeMillis() - time)); 
    } 
 

 
 5、好吧,看一下结果
 
Java代码 
threadCount: 10 
......(表示有N多个) 
threadCount: 10 
running thread id = 8 
running thread id = 9 
running thread id = 11 
running thread id = 10 
running thread id = 12 
threadCount: 5 
......(表示有N多个) 
threadCount: 5 
running thread id = 9 
running thread id = 10 
running thread id = 12 
running thread id = 8 
running thread id = 11 
threadCount: 3 
time: 2032 
 哈哈,看来是可以了,最后汇总任务的处理时间是2032毫秒,看来是比单个任务顺序执行来的快。但是有几个问题:
1)如果没有catch那个超级Exception的话,就会抛下面的异常:
Java代码 
java.lang.IllegalMonitorStateException 
    at java.lang.Object.wait(Native Method) 
    at java.lang.Object.wait(Object.java:485) 
    at com.one.Executer.join(Executer.java:38) 
    at com.test.Test.main(Test.java:21) 
 
2)为啥会打印N多个同样值threadCount呢?
于是和同事(河东)沟通,他说wait要放在synchronized里面才行,好吧,试一下,改进一下Executer和Job
 
Java代码 
public class Executer { 
    //计算已经派发的任务数(条件谓词) 
    public static int THREAD_COUNT = 0; 
    //条件队列锁 
    public static final Object LOCK = new Object(); 
    //线程池 
    private Executor pool = null; 
    public Executer() { 
        this(1); 
    } 
    public Executer(int threadPoolSize) { 
        pool = Executors.newFixedThreadPool(threadPoolSize); 
    } 
    /**
     * 任务派发
     * @param job
     */ 
    public void fork(Job job){ 
        //将任务派发给线程池去执行 
        pool.execute(job); 
        //增加线程数 
        synchronized (LOCK) { 
            THREAD_COUNT++; 
        } 
    } 
    /**
     * 统计任务结果
     */ 
    public void join(){ 
        synchronized (LOC
补充:软件开发 , Java ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,