zmq是一个开源的消息队列库,前几天针对中继spf的框架分析,发现可能存在进程间通信问题,方式可能是一对多。随意就想看看zmq如何实现进程间通信问题。
既然涉及到一对多,就涉及到一个客户端识别问题,分析了所有的zmq的实例代码,都是1对1 的方式。就是没有一对多的,最终功夫不负有心人,终于让我找到蛛丝马迹解决了这个问题。
这里用到了另一个zmq的库czmq,他是zmq的高层C封装。使用的通信方式是ZMQ_ROUTER。
这种通信方式的特点是server收到消息后,会在消息头部添加来源的标记,这样app得到消息是就可以通过识别消息头部判断 消息来源。
1 void *receiver = zsocket_new(ctx, ZMQ_ROUTER);
2 zsocket_bind(receiver, "ipc:///tmp/0");
3
4 ....
5
6 //recv message
7 zmsg_t *msg = zmsg_recv(receiver);
8 if (!msg){
9 //error handle..
10 return;
11 }
12 zframe_t *address = zmsg_unwrap (msg); //源地址
13 zframe_t *frame = zmsg_first (msg); //得到具体的消息
14 zmsg_destroy (&msg);
15
16 //do something, it make time some times...
17
18 //make response message
19 zmsg_t *message = zmsg_new();
20 zframe_t* body = zframe_new("world",sizeof("world"));
21
22 zmsg_push (message, body);
23 zmsg_wrap (message, address); //将目的地址封装进去
24
25 zmsg_send (&message, receiver);
26 zmsg_destroy(&message);
这里用到了两个关键的api:zmsg_unwrap 和 zmsg_wrap
struct _zmsg_t {
zlist_t *frames; // 数据帧列表
size_t content_size; // 消息总长度
};
// Opaque class structure
typedef struct _zmsg_t zmsg_t;
// --------------------------------------------------------------------------
// Push frame plus empty frame to front of message, before first frame.
// Message takes ownership of frame, will destroy it when message is sent.
void zmsg_wrap (zmsg_t *self, zframe_t *frame)
{
assert (self);
assert (frame);
if (zmsg_pushmem (self, "", 0) == 0) //首先在消息头添加一个空的帧
zmsg_push (self, frame); //谈后将指定的帧添加进去
}
// --------------------------------------------------------------------------
// Pop frame off front of message, caller now owns frame
// If next frame is empty, pops and destroys that empty frame.
zframe_t *zmsg_unwrap (zmsg_t *self)
{
assert (self);
zframe_t *frame = zmsg_pop (self); //pop第一个帧
zframe_t *empty = zmsg_first (self); //判断第一个(其实是第二个)是不是空的
if (empty && zframe_size (empty) == 0) //是空帧就pop
{
empty = zmsg_pop (self);
zframe_destroy (&empty);
}
return frame;
}
阅读(6268) | 评论(0) | 转发(0) |