当前位置:编程学习 > C/C++ >>

ZeroMQ指南-第1章-基础-分而治之

分而治之
作为最终示例(你肯定对生动的代码开始生厌并希望回头去钻研关于比较性、抽象性准则的语言学探讨),让我们来做一个小型超级计算。然后喝个咖啡。我们的超级计算程序是个非常典型的并行处理模型。我们有:
一个通风机(ventilator)来产生可以并行处理的任务
一组工人(worker)来处理任务
一个水槽(sink)来回收工人处理的结果
事实上,工人运行于超快的机子,没准是GPU(图形处理单元)来做困难运算。这是通风机代码,生成100个任务,每个任务都是一条消息告诉工人休眠(sleep)几毫秒。
taskvent: Parallel task ventilator in C
[cpp]  
//  
// Task ventilator  
// Binds PUSH socket to tcp://localhost:5557  
// Sends batch of tasks to workers via that socket  
//  
#include "zhelpers.h"  
  
int main (void)  
{  
    void *context = zmq_ctx_new ();  
  
    // Socket to send messages on  
    void *sender = zmq_socket (context, ZMQ_PUSH);  
    zmq_bind (sender, "tcp://*:5557");  
  
    // Socket to send start of batch message on  
    void *sink = zmq_socket (context, ZMQ_PUSH);  
    zmq_connect (sink, "tcp://localhost:5558");  
  
    printf ("Press Enter when the workers are ready: ");  
    getchar ();  
    printf ("Sending tasks to workers…\n");  
  
    // The first message is "0" and signals start of batch  
    s_send (sink, "0");  
  
    // Initialize random number generator  
    srandom ((unsigned) time (NULL));  
  
    // Send 100 tasks  
    int task_nbr;  
    int total_msec = 0; // Total expected cost in msecs  
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {  
        int workload;  
        // Random workload from 1 to 100msecs  
        workload = randof (100) + 1;  
        total_msec += workload;  
        char string [10];  
        sprintf (string, "%d", workload);  
        s_send (sender, string);  
    }   www.zzzyk.com
    printf ("Total expected cost: %d msec\n", total_msec);  
    sleep (1); // Give 0MQ time to deliver  
  
    zmq_close (sink);  
    zmq_close (sender);  
    zmq_ctx_destroy (context);  
    return 0;  
}  
图 5 - 并行管道
 
这是工人程序。接收消息,休眠指定的时间,然后表明自己完成任务:
taskwork: Parallel task worker in C
[cpp] 
//  
// Task worker  
// Connects PULL socket to tcp://localhost:5557  
// Collects workloads from ventilator via that socket  
// Connects PUSH socket to tcp://localhost:5558  
// Sends results to sink via that socket  
//  
#include "zhelpers.h"  
  
int main (void)  
{  
    void *context = zmq_ctx_new ();  
  
    // Socket to receive messages on  
    void *receiver = zmq_socket (context, ZMQ_PULL);  
    zmq_connect (receiver, "tcp://localhost:5557");  
  
    // Socket to send messages to  
    void *sender = zmq_socket (context, ZMQ_PUSH);  
    zmq_connect (sender, "tcp://localhost:5558");  
  
    // Process tasks forever  
    while (1) {  
        char *string = s_recv (receiver);  
        // Simple progress indicator for the viewer  
        fflush (stdout);  
        printf ("%s.", string);  
  
        // Do the work  
        s_sleep (atoi (string));  
        free (string);  
  
        // Send results to sink  
        s_send (sender, "");  
    }  
    zmq_close (receiver);  
    zmq_close (sender);  
    zmq_ctx_destroy (context);  
    return 0;  
}  
这是水槽程序。它收集这100个任务,然后计算整个处理消耗的时间,让我们能够证实如果有多个工人时他们真的是并行运转的:
tasksink: Parallel task sink in C
[cpp]  
//  
// Task sink  
// Binds PULL socket to tcp://localhost:5558  
// Collects results from workers via that socket  
//  
#include "zhelpers.h"  
  
int main (void)  
{  
    // Prepare our context and socket  
    void *context = zmq_ctx_new ();  
    void *receiver = zmq_socket (context, ZMQ_PULL);  
    zmq_bind (receiver, "tcp://*:5558");  
  
    // Wait for start of batch  
    char *string = s_recv (receiver);  
    free (string);  
  
    // Start our clock now  
    int64_t start_time = s_clock ();  
  
    // Process 100 confirmations  
    int task_nbr;  
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {  
        char *string = s_recv (receiver);  
        free (string);  
        if ((task_nbr / 10) * 10 == task_nbr)  
            printf (":");  
        else  
            printf (".");  
        fflush (stdout);  
    }  
    // Calculate and report duration of batch  
    printf ("Total elapsed time: %d msec\n",  
            (int) (s_clock () - start_time));  
  
    zmq_close (receiver);  
    zm
补充:软件开发 , C++ ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,