进程间通信是一个很重要的主题,因为它能够是许许多多相互通信的异步进程构成系统。而消息队列就提供了进程之间相互通信的方法。
要利用消息队列,首先要创建自己的消息结构体,其格式如下:
struct msgbuf
{
long type;
char msg[LEN];
};
其中结构体中的第一个成员类型必须是表示消息类型的long型变量。
消息队列相关函数:
1.创建消息队列
int msgget(key_t key, int msgflg)
key是一个键值,可由ftok函数获得,也可以自己指定
msgflg参数是一些标志位。该调用返回与健值key相对应的消息队列描述字。
在以下两种情况下,该调用将创建一个新的消息队列:
- 如果没有消息队列与健值key相对应,并且msgflg中包含了IPC_CREAT标志位;
- key参数为IPC_PRIVATE;
参数msgflg可以为以下:IPC_CREAT、IPC_EXCL、IPC_NOWAIT或三者的或结果。
调用返回:成功返回消息队列描述字,否则返回-1。
key_t ftok(const char *pathname, int proj_id);
产生与文件pathname相关联的键值,文件pathname必须存在。
2.发送消息
int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg);
向msgid代表的消息队列发送一个消息,即将发送的消息存储在msgp指向的msgbuf结构中,消息的大小由msgze指定。
对发送消息来说,有意义的msgflg标志为IPC_NOWAIT,指明在消息队列没有足够空间容纳要发送的消息时,msgsnd是否等待。造成msgsnd()等待的条件有两种:
- 当前消息的大小与当前消息队列中的字节数之和超过了消息队列的总容量;
- 当前消息队列的消息数(单位"个")不小于消息队列的总容量(单位"字节数"),此时,虽然消息队列中的消息数目很多,但基本上都只有一个字节。
msgsnd()解除阻塞的条件有三个:
- 不满足上述两个条件,即消息队列中有容纳该消息的空间;
- msqid代表的消息队列被删除;
- 调用msgsnd()的进程被信号中断;
调用返回:成功返回0,否则返回-1。
3.接收消息
int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long msgtyp, int msgflg);
该系统调用从msgid代表的消息队列中读取一个消息,并把消息存储在msgp指向的msgbuf结构中。
msqid为消息队列描述字;消息返回后存储在msgp指向的地址,msgsz指定msgbuf的mtext成员的长度(即消息内容的长度),msgtyp为请求读取的消息类型;读消息标志msgflg可以为以下几个常值的或:
- IPC_NOWAIT 如果没有满足条件的消息,调用立即返回,此时,errno=ENOMSG
- IPC_EXCEPT 与msgtyp>0配合使用,返回队列中第一个类型不为msgtyp的消息
- IPC_NOERROR 如果队列中满足条件的消息内容大于所请求的msgsz字节,则把该消息截断,截断部分将丢失。
msgrcv手册中详细给出了消息类型取不同值时(>0; <0; =0),调用将返回消息队列中的哪个消息。
msgrcv()解除阻塞的条件有三个:
- 消息队列中有了满足条件的消息;
- msqid代表的消息队列被删除;
- 调用msgrcv()的进程被信号中断;
调用返回:成功返回读出消息的实际字节数,否则返回-1。
4.配置消息队列
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
该系统调用对由msqid标识的消息队列执行cmd操作,共有三种cmd操作:IPC_STAT、IPC_SET 、IPC_RMID。
- IPC_STAT:该命令用来获取消息队列信息,返回的信息存贮在buf指向的msqid结构中;
- IPC_SET:该命令用来设置消息队列的属性,要设置的属性存储在buf指向的msqid结构中;可设置属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,同时,也影响msg_ctime成员。
- IPC_RMID:删除msqid标识的消息队列;
调用返回:成功返回0,否则返回-1。
结构msqid_ds用来设置或返回消息队列的信息,存在于用户空间
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first; /* first message on queue,unused */
struct msg *msg_last; /* last message in queue,unused */
__kernel_time_t msg_stime; /* last msgsnd time */
__kernel_time_t msg_rtime; /* last msgrcv time */
__kernel_time_t msg_ctime; /* last change time */
unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
unsigned long msg_lqbytes; /* ditto */
unsigned short msg_cbytes; /* current number of bytes on queue */
unsigned short msg_qnum; /* number of messages in queue */
unsigned short msg_qbytes; /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};
示例:
mqsend.c
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#define MAX_LINE 80
#define FILENAME "/root/c/communicate/common.h"
typedef struct
{
long type;
char msg[MAX_LINE];
}MY_TYPE_T;
int main()
{
MY_TYPE_T myobject;
int qid, ret;
key_t key;
key = ftok(FILENAME, 0);
qid = msgget(key, IPC_CREAT|666);
if(qid >= 0)
{
myobject.type = 1L;
strncpy(myobject.msg, "this is a test!\n", MAX_LINE);
ret = msgsnd(qid, (struct msgbuf*)&myobject, sizeof(MY_TYPE_T),0);
if(ret != -1)
printf("message successfully sent to queue\n");
}
else
printf("error:%s", strerror(errno));
return 0;
}
|
mqrecv.c
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#define MAX_LINE 80
#define FILENAME "/root/c/communicate/common.h"
typedef struct
{
long type;
char msg[MAX_LINE];
}MY_TYPE_T;
int main()
{
MY_TYPE_T myobject;
int qid, ret;
key_t key;
key = ftok(FILENAME, 0);
qid = msgget(key, 0);
if(qid >= 0)
{
/*每次取队列中的第一个消息*/
ret = msgrcv(qid, (struct msgbuf*)&myobject, sizeof(MY_TYPE_T),0,0);
if(ret != -1)
{
printf("message type:%d\n", myobject.type);
printf("message :%s\n", myobject.msg);
printf("message successfully received from queue\n");
}
}
else
printf("error:%s", strerror(errno));
return 0;
}
|
阅读(889) | 评论(0) | 转发(1) |