1、msgget()函数
该函数用来创建和获取一个消息地队列。原型如下:
-
#include <sys/types.h>
-
#include <sys/ipc.h>
-
#include <sys/msg.h>
-
-
int msgget(key_t key, int msgflg);
该函数的用法与信号量时一样的,都需要提供一个键值(一般使用ftok()函数生成)。msgflg是一个权限标志,表示消息队列的访问权限,与文件的访问权限是一样的。与信号量一样,也可以使用IPC_CREAT和IPC_EXECL作用于msgflag,达到类似的目的。
该函数成功返回一个非零整数的消息队列标识符,失败返回-1。
2、msgsnd()和msgrcv()函数
这两个系统调用被分别用来发送消息到队列和从队列中接收消息。调用进程对消息队列必须有写权限以发送消息,必须对消息队列有读权限以接收消息。
-
#include <sys/types.h>
-
#include <sys/ipc.h>
-
#include <sys/msg.h>
-
-
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
-
-
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msqid是有msgget函数返回的消息队列标识符。
msgp是一个指向准备发送或接收消息的指针。消息的结构具有如下结构:
-
struct msgbuf {
-
long mtype;
-
char mtext[0];
-
};
mtext数组大小是通过msgsz制定,它是一个非负整数,允许为0,即只有消息类型。mtype域必须是正整数值。这个值被用于接收进程对消息的选择。
2.1 msgsnd()
msgsnd()系统调用将msgp指向的消息的副本附加到由
标识符msqid指定的消息队列中。
如果队列空间足够,该系统调用会立刻返回成功。队列的容量可以通过msg_qbytes进行控制。队列创建过程中,该域段被初始化成了MSGMNB字节,但该限制可通过msgct()进行修改。如果存在下面条件之一,消息队列会被视为满了:
1) 增加一个
新的消息到队列中,将会导致队列中总字节数超过队列的{BANNED}{BANNED}最佳佳大大小(msg_qbytes域)。
2) 增加一个
新的消息到队列中,将会导致
队列中消息总数超过队列的{BANNED}{BANNED}最佳佳大大小(msg_qbytes域)。该检验主要用于阻止放入消息队列中的零长度消息出现无限数量的问题。
如果队列不足于放置该消息时,该系统调用将会被阻塞直到空间可用。如果给msgflg设置IPC_NOWAIT标记,该系统调动将会返回失败并带有EAGAIN的error。如果队列被移除,该系统调用将会设置error为EIDRM失败返回。
一旦该系统调用执行成功,消息队列的数据结构中的msg_lspid会被设置成调用进程的进程id,msg_qnum加1,msg_stime会被设置成当前的时间。
2.2 msgrcv()
该系统调用用于从队列中取消息并把它放到msgp指向的buffer中。
变量msgsz指定了msgp变量指向的结构的mtext成员的{BANNED}{BANNED}最佳佳大字节数。如果消息的内容长度大于msgsz,会产生什么行为依赖于msgflg是否指定了MSG_NOERROR。
如果msgflg制定了该flag,消息将会被截断,剩下的部分会被丢失。如果MSG_NOERROR没有被指定,消息将不会从消息队列中移除,系统调用会失败(返回值为-1,errno设置成E2BIG。
如果msgflg没有指定MSG_COPY,msgtyp变量必须指定要求接收的消息类型:
(1) 如果msgtype是0,队列中的{BANNED}首选消息会被取出;
(2) 如果msgtype大于0,消息队列中的{BANNED}首选消息会被取出。如果msgflg没有指定了MSG_EXCEPT,如果消息队列中的{BANNED}首选消息的类型不是msgtype类型,则该消息不会被取出。
(3) 如果msgtype小于0,队列中{BANNED}首选小于或等于msgtype绝对值类型的消息将会被取出。
msgflg是一个bit掩码,可以由以下flag组合而成:
IPC_NOWAIT :如果队列中没有制定类型的消息,则立刻返回。
MSG_COPY: 非摧毁性的从队列中获取msgtype类型的消息副本,通常与IPC_NOWAIT结合起来使用。
MSG_EXCEPT: 与msgtyp大于0一起使用,用于读取队列中消息类型与msgtyp不同的首条消息。
MSG_NOERROR: 用于当消息长度大于buffer大小时截断消息。
调用该接口时,如果msgflg没有指定IPC_NOWAIT,而消息队列中无要求类型的消息时,调用进程如果没有以下条件发生将会一直阻塞:
(1) 期望的消息被放置到队列中。
(2) 消息队列从系统中移除。这时系统调用errno设置成EIDRM返回失败。
(3) 调用进程捕获到一个信号。
一旦消息被成功完成,消息队列的数据结构中的msg_lrpid会被设置成调用进程的进程id,msg_qnum数减一,msg_rtime设置成当前时间。
返回值:
以上两个系统调用失败时均返回-1。成功时msgsnd返回0,msgrcv返回实际拷贝到mtext数组的字节数。
三、消息队列实操
-
#include <stdio.h>
-
#include <stdlib.h>
-
#include <string.h>
-
#include <time.h>
-
#include <unistd.h>
-
#include <errno.h>
-
#include <sys/types.h>
-
#include <sys/ipc.h>
-
#include <sys/msg.h>
-
-
struct msgbuf {
-
long mtype;
-
char mtext[80];
-
};
-
-
static void
-
usage(char *prog_name, char *msg)
-
{
-
if (msg != NULL)
-
fputs(msg, stderr);
-
-
fprintf(stderr, "Usage: %s [options]\n", prog_name);
-
fprintf(stderr, "Options are:\n");
-
fprintf(stderr, "-s send message using msgsnd()\n");
-
fprintf(stderr, "-r read message using msgrcv()\n");
-
fprintf(stderr, "-t message type (default is 1)\n");
-
fprintf(stderr, "-k message queue key (default is 1234)\n");
-
exit(EXIT_FAILURE);
-
}
-
-
static void
-
send_msg(int qid)
-
{
-
struct msgbuf msg;
-
char buf[80];
-
time_t t;
-
-
while (1) {
-
scanf("%ld", &msg.mtype);
-
fgets(buf, sizeof(buf), stdin);
-
time(&t);
-
-
snprintf(msg.mtext, sizeof(msg.mtext), "mstype=%ld: a message %s at %s",
-
msg.mtype, buf, ctime(&t));
-
-
if (msgsnd(qid, (void *) &msg, sizeof(msg.mtext),
-
IPC_NOWAIT) == -1) {
-
perror("msgsnd error");
-
exit(EXIT_FAILURE);
-
}
-
printf("sent: %s\n", msg.mtext);
-
}
-
}
-
-
-
static void
-
get_msg(int qid, int msgtype)
-
{
-
struct msgbuf msg;
-
-
while (1) {
-
if (msgrcv(qid, (void *) &msg, sizeof(msg.mtext), msgtype,
-
MSG_NOERROR) == -1) {
-
if (errno != ENOMSG) {
-
perror("msgrcv");
-
exit(EXIT_FAILURE);
-
}
-
printf("No message available for msgrcv()\n");
-
} else
-
printf("message received: %s\n", msg.mtext);
-
}
-
}
-
-
int
-
main(int argc, char *argv[])
-
{
-
int qid, opt;
-
int mode = 0; /* 1 = send, 2 = receive */
-
int msgtype = 1;
-
int msgkey = 1234;
-
-
while ((opt = getopt(argc, argv, "srt:k:")) != -1) {
-
switch (opt) {
-
case 's':
-
mode = 1;
-
break;
-
case 'r':
-
mode = 2;
-
break;
-
case 't':
-
msgtype = atoi(optarg);
-
if (msgtype <= 0)
-
usage(argv[0], "-t option must be greater than 0\n");
-
break;
-
case 'k':
-
msgkey = atoi(optarg);
-
break;
-
default:
-
usage(argv[0], "Unrecognized option\n");
-
}
-
}
-
-
if (mode == 0)
-
usage(argv[0], "must use either -s or -r option\n");
-
-
qid = msgget(msgkey, IPC_CREAT | 0666);
-
-
if (qid == -1) {
-
perror("msgget");
-
exit(EXIT_FAILURE);
-
}
-
-
if (mode == 2)
-
get_msg(qid, msgtype);
-
else
-
send_msg(qid);
-
-
exit(EXIT_SUCCESS);
-
}
发送消息进程和两个接收消息的进程:
进程后跟'-s'参数表示是负责发送消息的进程,'-r'参数表示启动的是接收消息的进程。'-t'表示接收消息的类型,其他使用说明间帮助信息。
发送消息的进程发送消息输入含义:开头的10进制数为消息类型,后面为消息内容,进程循环发送消息。
两个接收进程分别循环等待接收消息类型为10和20的消息,并打印接收到的消息。
如上测试结果,发送进程发送的消息与特定接收消息类型的进程接收的消息完全一致。msgrcv()接收其指定的消息类型的消息。
通过消息队列,可以实现多个进程间消息的传递。