消息队列就是存放在内核中的一个消息链表,注意它是存放在内核中的,所以在应用消息队列时,普通用户是无法运用的,只有切换到超级用户才可以使
用。而且是如果创建了一个消息只有在操作系统重启时才会真正删除该消息队列。在许多方面看来,消息队列类似与有名管道,但是却没有与打开与关闭管道的复杂
关联,然而,使用消息队列并没有解决我们使用有名管道所遇到的麻烦,例如管道上的阻塞。所以与有名管道比起来,消息队列的优点在于独立与发送与接收进程,
这减少了在打开与关闭有名管道之间同步的困难。
消息队列提供了一种由一个进程向另一个进程发送块数据的方法。另外,每一个数据块被
看作有一个类型,而接收进程可以独立接收具有不同类型的数据块。消息队列的好处在于我们几乎可以完全避免同步问题,并且可以通过发送消息屏蔽有名管道的问
题。更好的是,我们可以使用某些紧急方式发送消息。坏处在于,与管道类似,在每一个数据块上有一个最大尺寸限制,同时在系统中所有消息队列上的块尺寸上也
有一个最大尺寸限制。
要操作消息队列就要熟悉一些数据结构:下面介绍几个重要的数据结构:
1、消息缓冲结构
向消息队列发送消息时,必须组成合理的数据结构。Linux系统定义了一个模版数据结构msgbuf:
#include
struct msgbuf{
long type;
char mtex[1];
}
其中type表示消息的类型,这是接收消息的一个重要标志。mtex并不一定就是char 类型,任意类型都可以的。
2、msqid_ds内核数据结构。
Linux内核中,每个消息队列都维护一个结构体,此结构体保存着消息队列当前状态信息,该结构体在头文件linux/msg.h中定义,这里就不一一例举了。
3、ipc_perm内核数据结构
结构体ipc_perm保存着消息队列的一些重要的信息,比如说消息队列关联的键值,消息队列的用户id组id等。它定义在头文件linux/ipc.h中。
消息队列的创建:
#include
#include
key_t ftok (char*pathname, char proj);
它返回与路径pathname相对应的一个键值。该函数不直接对消息队列操作,但在调用ipc(MSGGET,…)或msgget()来获得消息队列描述字前,往往要调用该函数。典型的调用代码是:
key=ftok(path_ptr, 'a');
ipc_id=ipc(MSGGET, (int)key, flags,0,NULL,0);
2.系统V消息队列API
系统V消息队列API共有四个,使用时需要包括几个头文件:
#include
#include
#include
1)int msgget(key_t key, int msgflg)
参数key是一个键值,由ftok获得;msgflg参数是一些标志位。该调用返回与健值key相对应的消息队列描述字。
在以下两种情况下,该调用将创建一个新的消息队列:
如果没有消息队列与健值key相对应,并且msgflg中包含了IPC_CREAT标志位;
key参数为IPC_PRIVATE;
参数msgflg可以为以下:IPC_CREAT、IPC_EXCL、IPC_NOWAIT或三者的或结果。
调用返回:成功返回消息队列描述字,否则返回-1。
注:参数key设置成常数IPC_PRIVATE并不意味着其他进程不能访问该消息队列,只意味着即将创建新的消息队列。
2)int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long msgtyp, int msgflg);
该系统调用从msgid代表的消息队列中读取一个消息,并把消息存储在msgp指向的msgbuf结构中。
msqid为消息队列描述字;消息返回后存储在msgp指向的地址,msgsz指定msgbuf的mtext成员的长度(即消息内容的长度),msgtyp为请求读取的消息类型;读消息标志msgflg可以为以下几个常值的或:
IPC_NOWAIT 如果没有满足条件的消息,调用立即返回,此时,errno=ENOMSG
IPC_EXCEPT 与msgtyp>0配合使用,返回队列中第一个类型不为msgtyp的消息
IPC_NOERROR 如果队列中满足条件的消息内容大于所请求的msgsz字节,则把该消息截断,截断部分将丢失。
msgrcv手册中详细给出了消息类型取不同值时(>0; <0; =0),调用将返回消息队列中的哪个消息。
msgrcv()解除阻塞的条件有三个:
消息队列中有了满足条件的消息;
msqid代表的消息队列被删除;
调用msgrcv()的进程被信号中断;
调用返回:成功返回读出消息的实际字节数,否则返回-1。
3)int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg);
向msgid代表的消息队列发送一个消息,即将发送的消息存储在msgp指向的msgbuf结构中,消息的大小由msgze指定。
对发送消息来说,有意义的msgflg标志为IPC_NOWAIT,指明在消息队列没有足够空间容纳要发送的消息时,msgsnd是否等待。造成msgsnd()等待的条件有两种:
当前消息的大小与当前消息队列中的字节数之和超过了消息队列的总容量;
当前消息队列的消息数(单位"个")不小于消息队列的总容量(单位"字节数"),此时,虽然消息队列中的消息数目很多,但基本上都只有一个字节。
msgsnd()解除阻塞的条件有三个:
不满足上述两个条件,即消息队列中有容纳该消息的空间;
msqid代表的消息队列被删除;
调用msgsnd()的进程被信号中断;
调用返回:成功返回0,否则返回-1。
4)int msgctl(int msqid, int cmd, struct msqid_ds *buf);
该系统调用对由msqid标识的消息队列执行cmd操作,共有三种cmd操作:IPC_STAT、IPC_SET 、IPC_RMID。
IPC_STAT:该命令用来获取消息队列信息,返回的信息存贮在buf指向的msqid结构中;
IPC_SET:该命令用来设置消息队列的属性,要设置的属性存储在buf指向的msqid结构中;可设置属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,同时,也影响msg_ctime成员。
IPC_RMID:删除msqid标识的消息队列;
下面具体看一个利用消息的聊天程序:
-
#include<stdio.h>
-
#include<stdlib.h>
-
#include<sys/ipc.h>
-
#include<sys/msg.h>
-
#include<string.h>
-
#define BUF_SIZE 256
-
#define PROJ_ID 32
-
#define PATH_NAME "."
-
int main(void)
-
{
-
struct mymsgbuf{
-
-
long msgtype;
-
char ctrlstring[BUF_SIZE];
-
}msgbuffer;
-
int qid;
-
int msglen;
-
key_t msgkey;
-
if((msgkey=ftok(PATH_NAME,PROJ_ID))==-1){
-
-
perror("ftok error!");
-
exit(0);
-
}
-
if((qid=msgget(msgkey,IPC_CREAT))==-1){
-
-
perror("msgget error!\n");
-
exit(0);
-
}
-
msgbuffer.msgtype=3;
-
strcpy(msgbuffer.ctrlstring,"hello message queue");
-
msglen=sizeof(msgbuffer)-4;
-
if(msgsnd(qid,&msgbuffer,msglen,0)==-1){
-
-
perror("msgget error!\n");
-
exit(0);
-
}
-
exit(0);
-
}
-
#include<stdio.h>
-
#include<stdlib.h>
-
#include<sys/ipc.h>
-
#include<sys/msg.h>
-
#include<string.h>
-
#define BUF_SIZE 256
-
#define PROJ_ID 32
-
#define PATH_NAME "."
-
int main(void)
-
{
-
struct mymsgbuf{
-
-
long msgtype;
-
char ctrlstring[BUF_SIZE];
-
}msgbuffer;
-
int qid;
-
int msglen;
-
key_t msgkey;
-
if((msgkey=ftok(PATH_NAME,PROJ_ID))==-1){
-
-
perror("ftok error!");
-
exit(0);
-
}
-
if((qid=msgget(msgkey,IPC_CREAT))==-1){
-
-
perror("msgget error!\n");
-
exit(0);
-
}
-
-
msglen=sizeof(msgbuffer)-4;
-
if(msgrcv(qid,&msgbuffer,msglen,2,0)==-1){
-
-
perror("msgrcv error!\n");
-
exit(0);
-
}
-
printf("get message %s\n",msgbuffer.ctrlstring);
-
exit(0);
-
}
在看一个通信机制:信号量。信号量是一个计数器,常用于处理进程或线程的同步问题,尤其是对 临界资源的访问,临界资源可以简单地说是一段代码,一个变量或某种硬件资源等。信号量的值大于0表示可供并发进程使用的资源的实体数,小于0表示正在等待是用该种资源的进程数。
与消息队列类似,Linux内核也为每个信号集维护了一个semid_ds结构体,该结构体定义在头文件linux/sem.h中,具体含义我们可以在头文件中看源码,这里不再赘述。
下面看一下常用信号量的几个函数:
1)int semget(key_t key, int nsems, int semflg)
参
数key是一个键值,由ftok获得,唯一标识一个信号灯集,用法与msgget()中的key相同;参数nsems指定打开或者新创建的信号灯集中将包
含信号灯的数目;semflg参数是一些标志位。参数key和semflg的取值,以及何时打开已有信号灯集或者创建一个新的信号灯集与msgget()
中的对应部分相同,不再祥述。
该调用返回与健值key相对应的信号灯集描述字。
调用返回:成功返回信号灯集描述字,否则返回-1。
注:
如果key所代表的信号灯已经存在,且semget指定了IPC_CREAT|IPC_EXCL标志,那么即使参数nsems与原来信号灯的数目不等,返
回的也是EEXIST错误;如果semget只指定了IPC_CREAT标志,那么参数nsems必须与原来的值一致,在后面程序实例中还要进一步说明。
2)int semop(int semid, struct sembuf *sops, unsigned nsops);
semid是信号灯集ID,sops指向数组的每一个sembuf结构都刻画一个在特定信号灯上的操作。nsops为sops指向数组的大小。
sembuf结构如下:
struct sembuf {
unsigned short sem_num; /* semaphore index in array */
short sem_op; /* semaphore operation */
short sem_flg; /* operation flags */
};
sem_num
对应信号集中的信号灯,0对应第一个信号灯。sem_flg可取IPC_NOWAIT以及SEM_UNDO两个标志。如果设置了SEM_UNDO标志,那
么在进程结束时,相应的操作将被取消,这是比较重要的一个标志位。如果设置了该标志位,那么在进程没有释放共享资源就退出时,内核将代为释放。如果为一个
信号灯设置了该标志,内核都要分配一个sem_undo结构来记录它,为的是确保以后资源能够安全释放。事实上,如果进程退出了,那么它所占用就释放了,
但信号灯值却没有改变,此时,信号灯值反映的已经不是资源占有的实际情况,在这种情况下,问题的解决就靠内核来完成。这有点像僵尸进程,进程虽然退出了,
资源也都释放了,但内核进程表中仍然有它的记录,此时就需要父进程调用waitpid来解决问题了。
sem_op的值大于0,等于0以及小于0确定了对sem_num指定的信号灯进行的三种操作。具体请参考linux相应手册页。
这
里需要强调的是semop同时操作多个信号灯,在实际应用中,对应多种资源的申请或释放。semop保证操作的原子性,这一点尤为重要。尤其对于多种资源
的申请来说,要么一次性获得所有资源,要么放弃申请,要么在不占有任何资源情况下继续等待,这样,一方面避免了资源的浪费;另一方面,避免了进程之间由于
申请共享资源造成死锁。
也许从实际含义上更好理解这些操作:信号灯的当前值记录相应资源目前可用数目;sem_op>0对
应相应进程要释放sem_op数目的共享资源;sem_op=0可以用于对共享资源是否已用完的测试;sem_op<0相当于进程要申请
-sem_op个共享资源。再联想操作的原子性,更不难理解该系统调用何时正常返回,何时睡眠等待。
调用返回:成功返回0,否则返回-1。
3) int semctl(int semid,int semnum,int cmd,union semun arg)
该系统调用实现对信号灯的各种控制操作,参数semid指定信号灯集,参数cmd指定具体的操作类型;参数semnum指定对哪个信号灯操作,只对几个特殊的cmd操作有意义;arg用于设置或返回信号灯信息。
该系统调用详细信息请参见其手册页,这里只给出参数cmd所能指定的操作。
IPC_STAT
获取信号灯信息,信息由arg.buf返回;
IPC_SET
设置信号灯信息,待设置信息保存在arg.buf中(在manpage中给出了可以设置哪些信息);
GETALL
返回所有信号灯的值,结果保存在arg.array中,参数sennum被忽略;
GETNCNT
返回等待semnum所代表信号灯的值增加的进程数,相当于目前有多少进程在等待semnum代表的信号灯所代表的共享资源;
GETPID
返回最后一个对semnum所代表信号灯执行semop操作的进程ID;
GETVAL
返回semnum所代表信号灯的值;
GETZCNT
返回等待semnum所代表信号灯的值变成0的进程数;
SETALL
通过arg.array更新所有信号灯的值;同时,更新与本信号集相关的semid_ds结构的sem_ctime成员;
SETVAL
设置semnum所代表信号灯的值为arg.val;
调用返回:调用失败返回-1,成功返回与cmd相关:
竞争问题:
第
一个创建信号灯的进程同时也初始化信号灯,这样,系统调用semget包含了两个步骤:创建信号灯;初始化信号灯。由此可能导致一种竞争状态:第一个创建
信号灯的进程在初始化信号灯时,第二个进程又调用semget,并且发现信号灯已经存在,此时,第二个进程必须具有判断是否有进程正在对信号灯进行初始化
的能力。在参考文献[1]中,给出了绕过这种竞争状态的方法:当semget创建一个新的信号灯时,信号灯结构semid_ds的sem_otime成员
初始化后的值为0。因此,第二个进程在成功调用semget后,可再次以IPC_STAT命令调用semctl,等待sem_otime变为非0值,此时
可判断该信号灯已经初始化完毕。下图描述了竞争状态产生及解决方法:
实际上,这种解决方法也是基于这样一个假定:第一个创建信号灯的进程必须调用semop,这样sem_otime才能变为非零值。另外,因为第一个进程可能不调用semop,或者semop操作需要很长时间,第二个进程可能无限期等待下去,或者等待很长时间。
信号量的一个应用实例:
-
#include<stdio.h>
-
#include<sys/types.h>
-
#include<linux/sem.h>
-
#include<stdlib.h>
-
#define MAX_RESOURSE 5
-
int main(int argc,char **argv)
-
{
-
key_t key;
-
int semid;
-
struct sembuf buf={0,-1,IPC_NOWAIT};
-
union semun semopts;
-
if((key=ftok(".",'s'))==-1)
-
{
-
perror("ftok error!\n");
-
exit(1);
-
}
-
if((semid=semget(key,1,IPC_CREAT|0666))==-1)
-
{
-
perror("semget error!\n");
-
exit(1);
-
}
-
semopts.val=MAX_RESOURSE;
-
if(semctl(semid,0,SETVAL,semopts)==-1)
-
{
-
perror("semctl error!\n");
-
exit(1);
-
}
-
while(1)
-
{
-
if(semop(semid,&buf,1)==-1)
-
{
-
perror("semop error!\n");
-
exit(1);
-
}
-
sleep(3);
-
}
-
return 0;
-
}
-
#include<stdio.h>
-
#include<sys/types.h>
-
#include<linux/sem.h>
-
#include<stdlib.h>
-
-
int main(void)
-
{
-
key_t key;
-
int semid,semval;
-
-
union semun semopts;
-
if((key=ftok(".",'s'))==-1)
-
{
-
perror("ftok error!\n");
-
exit(0);
-
}
-
if((semid=semget(key,1,IPC_CREAT|0666))==-1)
-
{
-
perror("semget error!\n");
-
exit(1);
-
}
-
-
while(1)
-
{
-
if((semval=semctl(semid,0,GETVAL,0))==-1)
-
{
-
perror("semctl error!\n");
-
exit(0);
-
}
-
if(semval>0)
-
{
-
printf("still %d resourse can be used\n",semval);
-
}
-
else
-
{
-
printf("no more resouse can be used\n");
-
break;
-
}
-
-
sleep(3);
-
}
-
exit(0);
-
}