一、什么是消息队列
消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。
每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。
消息队列的最佳定义是:内核地址空间中的内部链表。消息可以顺序地发送到队列中,并以几种不同的方式从队列中获取。当然,每个消息队列都是由IPC标识符所唯一标识的。
二、消息队列的创建
#include
#include
#include
int msgget(key_t key, int msgflg)
key:键值,由ftok获得。
msgflg:标志位。
IPC_CREAT—如果在内核中不存在该队列,则创建它。
IPC_EXCL—当与IPC_CREAT一起使用时,如果队列早已存在则将出错。
返回值:与健值key相对应的消息队列描述字
三、读写消息队列
include
int msgsnd ( int msqid, const void *prt, size_t nbytes, int flags);
对于写入队列的每一个消息,都含有三个值,正长整型的类型字段、数据长度字段和实际数据字节。新的消息总是放在队列的尾部,函数中参数msqid指定要操作的队列,ptr指针指向一个msgbuf的结构,定义如下:
struct msgbuf{
long mtype;
char mbuf[];
};
这是一个模板的消息结构,其中成员mbuf是一个字符数组,长度是根据具体的消息来决定的,切忌消息不能以NULL结尾。成员mtype是消息的类型字段。
函数参数nbytes指定了消息的长度,参数flags指明函数的行为。函数成功返回0,失败返回–1并设置错误变量errno。errno可能出现的值有:EAGAIN、EACCES、EFAULT、EIDRM、EINTR、EINVAL和ENOMEM。当函数成功返回后会更新相应队列的msqid_ds结构。
使用函数msgrcv可以从队列中读取消息,函数原型如下:
#include
ssize_t msgrcv ( int msqid, void *ptr, size_t nbytes, long type , int flag);
函数中参数msqid为指定要读的队列,参数ptr为要接收数据的缓冲区,nbytes为要接收数据的长度,当队列中满足条件的消息长度大于nbytes的值时,则会参照行为参数flag的值决定如何操作:当flag中设置了MSG_NOERROR位时,则将消息截短到nbytes指定的长度后返回。如没有MSG_NOERROR位,则函数出错返回,并设置错误变量errno。设置type参数指定msgrcv函数所要读取的消息,tyre的取值及相应操作如表所示。
type值详解
等于0
|
返回队列最上面的消息(根据先进先出规则)
|
|
|
|
|
大于0
|
返回消息类型与type相等的第1条消息
|
|
|
|
|
小于0
|
返回消息类型小于等于type绝对值的最小值的第1条消息
|
|
|
|
|
参数flag定义函数的行为,如设置了IPC_NOWAIT位,则当队列中无符合条件的消息时,函数出错返回,errno的值为ENOMSG。如没有设置IPC_NOWAIT位,则进程阻塞直到出现满足条件的消息出现为止,然后函数读取消息返回。
下面实例演示了消息队列在进程间的通信。程序中创建了一个消息的模板结构体,并对声明变量做初始化。使用msgget函数创建了一个消息队列,使用msgsnd函数向该队列中发送了一条消息。
四、消息队列的控制
该函数用来控制消息队列,它与共享内存的shmctl函数相似,它的原型为:
intmsgctl(intmsgid,intcommand,structmsgid_ds *buf);
ommand是将要采取的动作,它可以取3个值,
IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。
IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值
IPC_RMID:删除消息队列
五、实例
#include
#include
#include
#include
#include
#include
#include
#include
struct my_msg_st
{
long int my_msg_type;
char some_text[BUFSIZ];
};
int main(void)
{
int running=1;
int msgid;
struct my_msg_st some_data;
long int msg_to_receive=0;
/*创建消息队列*/
msgid=msgget((key_t)1234,0666 | IPC_CREAT);
if(msgid==-1)
{
fprintf(stderr,"msgget failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
/*循环从消息队列中接收消息*/
while(running)
{
/*读取消息*/
if(msgrcv(msgid,(void *)&some_data,BUFSIZ,msg_to_receive,0)==-1)
{
fprintf(stderr,"msgrcv failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
printf("You wrote: %s",some_data.some_text);
/*接收到的消息为“end”时结束循环*/
if(strncmp(some_data.some_text,"end",3)==0)
{
running=0;
}
}
/*从系统内核中移走消息队列*/
if(msgctl(msgid,IPC_RMID,0)==-1)
{
fprintf(stderr,"msgctl(IPC_RMID) failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
client
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_TEXT 512
struct my_msg_st
{
long int my_msg_type;
char some_text[MAX_TEXT];
};
int main(void)
{
int running=1;
struct my_msg_st some_data;
int msgid;
char buffer[BUFSIZ];
/*创建消息队列*/
msgid=msgget((key_t)1234,0666 | IPC_CREAT);
if(msgid==-1)
{
fprintf(stderr,"msgget failed with error:%d\n",errno);
exit(EXIT_FAILURE);
}
/*循环向消息队列中添加消息*/
while(running)
{
printf("Enter some text:");
fgets(buffer,BUFSIZ,stdin);
some_data.my_msg_type=1;
strcpy(some_data.some_text,buffer);
/*添加消息*/
if(msgsnd(msgid,(void *)&some_data,MAX_TEXT,0)==-1)
{
fprintf(stderr,"msgsed failed\n");
exit(EXIT_FAILURE);
}
/*用户输入的为“end”时结束循环*/
if(strncmp(buffer,"end",3)==0)
{
running=0;
}
}
exit(EXIT_SUCCESS);
}
a进程和b进程都正确获得了对方发送的数据