Chinaunix首页 | 论坛 | 博客
  • 博客访问: 879486
  • 博文数量: 254
  • 博客积分: 5350
  • 博客等级: 大校
  • 技术积分: 2045
  • 用 户 组: 普通用户
  • 注册时间: 2008-06-27 13:27
文章分类

全部博文(254)

文章存档

2015年(1)

2014年(9)

2013年(17)

2012年(30)

2011年(150)

2010年(17)

2009年(28)

2008年(2)

分类: C/C++

2012-02-27 09:39:44

消息队列

一个或多个进程可向消息队列写入消息,而一个或多个进程可从消息队列中读取消息,这种进程间通讯机制通常使用在客户/服务器模型中,客户向服务器发送请求消息,服务器读取消息并执行相应请求。在许多微内核结构的操作系统中,内核和各组件之间的基本通讯方式就是消息队列。例如,在 MINIX 操作系统中,内核、I/O任务、服务器进程和用户进程之间就是通过消息队列实现通讯的。

Linux中的消息可以被描述成在内核地址空间的一个内部链表,每一个消息队列由一个IPC的标识号唯一的标识。Linux 为系统中所有的消息队列维护一个 msgque 链表,该链表中的每个指针指向一个 msgid_ds 结构,该结构完整描述一个消息队列。

1. 数据结构

(1)消息缓冲区(msgbuf)

我们在这里要介绍的第一个数据结构是msgbuf结构,可以把这个特殊的数据结构看成一个存放消息数据的模板,它在include/linux/msg.h中声明,描述如下:

/* msgsnd msgrcv 系统调用使用的消息缓冲区*/

struct msgbuf {

    long mtype;         /* 消息的类型,必须为正数 */

    char mtext[1];      /* 消息正文 */

};

注意:对于消息数据元素(mtext),不要受其描述的限制。实际上,这个域(mtext)不仅能保存字符数组,而且能保存任何形式的任何数据。这个域本身是任意的,因为这个结构本身可以由应用程序员重新定义:

struct my_msgbuf {

      long    mtype;          /* 消息类型 */

        long    request_id;     /* 请求识别号 */

        struct client info;    /* 客户消息结构 */

};

我们看到,消息的类型还是和前面一样,但是结构的剩余部分由两个其它的元素代替,而且有一个是结构。这就是消息队列的优美之处,内核根本不管传送的是什么样的数据,任何信息都可以传送。

但是,消息的长度还是有限制的,在Linux中,给定消息的最大长度在include/linux/msg.h中定义如下:

#define MSGMAX 8192    /* max size of message (bytes) */

消息总的长度不能超过8192字节,包括mtype域,它是4字节长。

(2)消息结构(msg)

内核把每一条消息存储在以msg结构为框架的队列中,它在include/ linux/msg.h中定义如下:

struct msg {

    struct msg *msg_next;   /* 队列上的下一条消息 */

    long msg_type;          /*消息类型*/

    char *msg_spot;         /* 消息正文的地址 */

    short msg_ts;           /* 消息正文的大小 */

};

注意:msg_next是指向下一条消息的指针,它们在内核地址空间形成一个单链表。

(3)消息队列结构(msgid_ds)

当在系统中创建每一个消息队列时,内核创建、存储及维护这个结构的一个实例。

/* 在系统中的每一个消息队列对应一个msqid_ds 结构 */

struct msqid_ds {

    struct ipc_perm msg_perm;

    struct msg *msg_first;    /* 队列上第一条消息,即链表头*/

    struct msg *msg_last;    /* 队列中的最后一条消息,即链表尾 */

    time_t msg_stime;        /* 发送给队列的最后一条消息的时间 */

    time_t msg_rtime;            /* 从消息队列接收到的最后一条消息的时间 */

    time_t msg_ctime;             /* 最后修改队列的时间*/

    ushort msg_cbytes;          /*队列上所有消息总的字节数 */

    ushort msg_qnum;          /*在当前队列上消息的个数 */

    ushort msg_qbytes;        /* 队列最大的字节数 */

    ushort msg_lspid;           /* 发送最后一条消息的进程的pid */

    ushort msg_lrpid;           /* 接收最后一条消息的进程的pid */

};

2. 系统调用: msgget()

为了创建一个新的消息队列,或存取一个已经存在的队列,要使用msgget()系统调用。                                               

原型: int msgget ( key_t key, int msgflg );                                            

返回: 成功,则返回消息队列识别号,失败,则返回-1

semget()中的第一个参数是键, 这个键值要与现有的键值进行比较,现有的键值指在内核中已存在的其它消息队列的键值。对消息队列的打开或存取操作依赖于msgflg参数的取值:

IPC_CREAT : 如果这个队列在内核中不存在,则创建它。

IPC_EXCL :当与IPC_CREAT一起使用时,如果这个队列已存在,则创建失败。

如果IPC_CREAT单独使用,semget()为一个新创建的消息队列返回标识号,或者返回具有相同键值的已存在队列的标识号。如果IPC_EXCLIPC_CREAT一起使用,要么创建一个新的队列,要么对已存在的队列返回-1IPC_EXCL单独是没有用的,当与IPC_CREAT结合起来使用时,可以保证新创建队列的打开和存取。

与文件系统的存取权限一样,每一个IPC对象也具有存取权限,因此,可以把一个八进制与掩码或,形成对消息队列的存取权限。

让我们来创建一个打开或创建消息队列的函数:

int open_queue( key_t keyval )

{

        int     qid;

        if((qid = msgget( keyval, IPC_CREAT | 0660 )) == -1)

                return(-1);

        return(qid);

}

注意,这个例子显式地用了0660权限。这个函数要么返回一个消息队列的标识号,要么返回-1而出错。键值作为唯一的参数必须传递给它。

3. 系统调用: msgsnd()

一旦我们有了队列识别号,我们就可以在这个队列上执行操作。要把一条消息传递给一个队列,你必须用msgsnd()系统调用。

原型:int msgsnd ( int msqid, struct msgbuf *msgp, int msgsz, int msgflg );

    返回:成功为0, 失败为-1

msgsnd()的第一个参数是队列识别号,由msgget()调用返回。第二个参数msgp是一个指针,指向我们重新声明和装载的消息缓冲区。msgsz参数包含了消息以字节为单位的长度,其中包括了消息类型的4个字节。

msgflg参数可以设置成0(忽略),或者:

IPC_NOWAIT :如果消息队列满,消息不写到队列中,并且控制权返回给调用进程(继续执行)。如果不指定IPC_NOWAIT,调用进程将挂起(阻塞)直到消息被写到队列中。

让我们来看一个发送消息的简单函数:

int send_message( int qid, struct mymsgbuf *qbuf )

{

        int     result, length;

        /* mymsgbuf结构的实际长度 */

        length = sizeof(struct ) - sizeof(long);       

        if((result = msgsnd( qid, qbuf, length, 0)) == -1)

                return(-1);

        return(result);

}

这个小函数试图把缓冲区qbuf中的消息,发送给队列识别号为qid的消息队列。

现在,我们在消息队列里有了一条消息,可以用ipcs命令来看你队列的状态。如何从消息队列检索消息,可以用msgrcv()系统调用。

链接转至Unix IPC 消息队列用法2

4.系统调用:msgrcv()                                                 

原型:int msgrcv ( int msqid, struct msgbuf *msgp, int msgsz, long mtype, int msgflg );

   返回值:成功,则为拷贝到消息缓冲区的字节数,失败为-1

很明显,第一个参数用来指定要检索的队列(必须由msgget()调用返回),第二个参数(msgp)是存放检索到消息的缓冲区的地址,第三个参数(msgsz)是消息缓冲区的大小,不包括消息类型mtype的长度。

第四个参数(mtype)指定了消息的类型。内核将搜索队列中相匹配类型的最早的消息,并且返回这个消息的一个拷贝,返回的消息放在由msgp参数指向的地址。这里存在一个特殊的情况,如果传递给mytype参数的值为0,就可以不管类型,只返回队列中最早的消息。

如果传递给参数msgflg的值为IPC_NOWAIT,并且没有可取的消息,那么给调用进程返回ENOMSG错误消息,否则,调用进程阻塞,直到一条消息到达队列并且满足msgrcv()的参数。如果一个客户正在等待消息,而队列被删除,则返回EIDRM。如果当进程正在阻塞,并且等待一条消息到达但捕获到了一个信号,则返回EINTR

让我们来看一个从我们已建的消息队列中检索消息的例子

int read_message( int qid, long type, struct mymsgbuf *qbuf )

{

        int     result, length;

        /* 计算mymsgbuf结构的实际大小*/

        length = sizeof(struct mymsgbuf) - sizeof(long);       

        if((result = msgrcv( qid, qbuf, length, type, 0)) == -1)

                return(-1);

        return(result);

}

当从队列中成功地检索到消息后,这个消息将从队列删除。
阅读(2929) | 评论(0) | 转发(3) |
给主人留下些什么吧!~~