Chinaunix首页 | 论坛 | 博客
  • 博客访问: 162671
  • 博文数量: 115
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 10
  • 用 户 组: 普通用户
  • 注册时间: 2016-11-28 14:16
文章分类

全部博文(115)

文章存档

2017年(36)

2016年(79)

我的朋友

分类: LINUX

2016-11-29 11:33:43

原文地址:IPC--消息队列 作者:zhenhuaqin

 

1.什么是消息?

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

“消息队列”是 Microsoft 的消息处理技术,它在任何安装了 Microsoft Windows 的计算机组合中,为任何应用程序提供消息处理和消息队列功能,无论这些计算机是否在同一个网络上或者是否同时联机。

“消息队列网络”是能够相互间来回发送消息的任何一组计算机。网络中的不同计算机在确保消息顺利处理的过程中扮演不同的角色。它们中有些提供路由信息以确定如何发送消息,有些保存整个网络的重要信息,而有些只是发送和接收消息。

“消息队列”安装期间,管理员确定哪些服务器可以互相通信,并设置特定服务器的特殊角色。构成此“消息队列”网络的计算机称为“站点”,它们之间通过“站点链接”相互连接。每个站点链接都有一个关联的“开销”,它由管理员确定,指示了经过此站点链接传递消息的频率。

“消息队列”管理员还在网络中设置一台或多台作为“路由服务器”的计算机。路由服务器查看各站点链接的开销,确定经过多个站点传递消息的最快和最有效的方法,以此决定如何传递消息。

2. 队列类型(Queue Type

有两种主要的队列类型:由您或网络中的其他用户创建的队列和系统队列。

2.1 用户创建的队列

“公共队列”在整个“消息队列”网络中复制,并且有可能由网络连接的所有站点访问。

“专用队列”不在整个网络中发布。相反,它们仅在所驻留的本地计算机上可用。专用队列只能由知道队列的完整路径名或标签的应用程序访问。

“管理队列”包含确认在给定“消息队列”网络中发送的消息回执的消息。指定希望 MessageQueue 组件使用的管理队列(如果有的话)。

“响应队列”包含目标应用程序接收到消息时返回给发送应用程序的响应消息。指定希望 MessageQueue 组件使用的响应队列(如果有的话)。

 

2.2 系统生成的队列

“日记队列”:可选地存储发送消息的副本和从队列中移除的消息副本。每个“消息队列”客户端上的单个日记队列存储从该计算机发送的消息副本。在服务器上为每个队列创建了一个单独的日记队列。此日记跟踪从该队列中移除的消息。

“死信队列”:存储无法传递或已过期的消息的副本。如果过期或无法传递的消息是事务性消息,则被存储在一种特殊的死信队列中,称为“事务性死信队列”。死信存储在过期消息所在的计算机上。有关超时期限和过期消息的更多信息,请参见默认消息属性。

“报告队列”: 包含指示消息到达目标所经过的路由的消息,还可以包含测试消息。每台计算机上只能有一个报告队列。

“专用系统队列”:是一系列存储系统执行消息处理操作所需的管理和通知消息的专用队列。

在应用程序中进行的大多数工作都涉及访问公共队列及其消息。但是,根据应用程序的日记记录、确认和其他特殊处理需要,在日常操作中很可能要使用几种不同的系统队列。

 

3. 同步和异步通信

队列通信天生就是异步的,因为将消息发送到队列和从队列中接收消息是在不同的进程中完成的。另外,可以异步执行接收操作,因为要接收消息的人可以对任何给定的队列调用 BeginReceive 方法,然后立即继续其他任务而不用等待答复。这与人们所了解的“同步通信”截然不同。

 在同步通信中,请求的发送方在执行其他任务前,必须等待来自预定接收方的响应。发送方等待的时间完全取决于接收方处理请求和发送响应所用的时间。

4.同消息队列交互(Interacting with Message Queues

消息处理和消息为基于服务器的应用程序组件之间的进程间通信提供了强大灵活的机制。同组件间的直接调用相比,它们具有若干优点,其中包括:

1) 稳定性 组件失败对消息的影响程度远远小于组件间的直接调用,因为消息存储在队列中并一直留在那里,直到被适当地处理。消息处理同事务处理相似,因为消息处理是有保证的。

2) 消息优先级 更紧急或更重要的消息可在相对不重要的消息之前接收,因此可以为关键的应用程序保证足够的响应时间。

3) 脱机能力 发送消息时,它们可被发送到临时队列中并一直留在那里,直到被成功地传递。当因任何原因对所需队列的访问不可用时,用户可以继续执行操作。同时,其他操作可以继续进行,如同消息已经得到了处理一样,这是因为网络连接恢复时消息传递是有保证的。

4) 事务性消息处理 将多个相关消息耦合为单个事务,确保消息按顺序传递、只传递一次并且可以从它们的目标队列中被成功地检索。如果出现任何错误,将取消整个事务。

5) 安全性 MessageQueue 组件基于的消息队列技术使用 Windows 安全来保护访问控制,提供审核,并对组件发送和接收的消息进行加密和验证。

5.有关函数说明:

1 创建消息队列

       #include <sys/msg.h>     

   #include <sys/types.h>
       #include <sys/ipc.h>    

  int msgget(key_t key, int msgflg);成功返回队列ID,失败返回-1

 

参数

说明

key

创建/打开队列key,ftok产生,可以直接给常量

msgflg

创建/打开方式IPC_CREATIPC_EXCLIPC_NOWAIT

  通常是msgflg =IPC_CREAT| IPC_EXCL|0666,意思是若不存在key值的队列则创建,否则如果存在则打开队列,0666意思与一般文件权限一样,XXX-本用户,同组用户,其他用户的读写执行的权限。

 

 key_t ftok(const char *pathname, int proj_id);

//获取pathname相对应的一个键值, pathname必须是存在并且可读取的文件,proj_id表示序号,用来区别同时的存在文件。成功返回key值,失败返回-1

          

2) 队列读写

a)   读取数据――阻塞读取消息队列,直到解除阻塞。

ssize_t msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz, long msgtyp, int msgflg);

 

 

参数

说明

msqid

已打开的消息队列id

msgp

接收存放的消息队列缓存结构

msgsz

消息数据长度

msgtyp

消息类型。=0 读取队列中第一个数据。

msgflg

读取标志通常使用IPC_NOWAIT:即没有满足条件的消息,立即返回,此时,错误代码errno=ENOMSG

IPC_EXCEPT:与msgtyp>0配合使用,返回队列中第一个类型不为msgtyp的消息

MSG_NOERROR:截断超长数据

 

缓冲内容结构如下:

struct msgbuf {

long mtype;    /* 消息类型 must be > 0 */

char mtext[1]; /* 消息数据 这里只是一个数组的首地址,并非是只有一个字符 */

            };

 

msgrcv()解除阻塞的条件三个条件:

1  消息队列中有了满足条件的消息(或使用了);

2  msqid代表的消息队列被删除;

3  调用msgrcv()的进程被信号中断;

b) 发送数据

int msgsnd(int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg);

 

参数

说明

msqid

已打开的消息队列id

msgp

发送存放的消息队列缓存结构

msgsz

消息数据长度

msgflg

消息类型。=0 读取队列中第一个数据。

 

3) 消息队列控制

          int msgctl(int msqid, int cmd, struct msqid_ds *buf);

成功返回0,失败返回-1

 

参数

说明

msqid

已打开的消息队列id

cmd

控制类型选项

IPC_STAT:取得队列状态

IPC_SET:设置队列属性

IPC_RMID:删除消息队列

buf

存放队列的属性结构

 

 队列属性如下:

struct msqid_ds

{

  struct ipc_perm msg_perm; /* structure describing operation permission */

  __time_t msg_stime; /*最后一次发送消息的时间 */

  unsigned long int __unused1; /*保留*/

  __time_t msg_rtime; /* 最后一次接收数据时间 */

  unsigned long int __unused2;     /*保留*/

  __time_t msg_ctime; /* 最后修改时间 */

  unsigned long int __unused3; /*保留*/

  unsigned long int __msg_cbytes; /* 当前队列字节数 */

  msgqnum_t msg_qnum; /* 当前队列的消息数 */

  msglen_t msg_qbytes; /* 队列中容量 */

  __pid_t msg_lspid; /* 最后发送消息的进程号 */

  __pid_t msg_lrpid; /* 最后接收队列的进程号*/

  unsigned long int __unused4; /*保留*/

  unsigned long int __unused5; /*保留*/

};

 

文件msg为空文件,可以为任何内容,这里只是为了ftok函数使用。程序通过建立消息队列,完成进程间通信,注意msgrcv的第四个参数为消息类型,他定义了从队列中取消息的类型。

6.实例:

下面是msgLucy.c,是建立消息队列的

#include

#include

#include

#include

#include

#include

#include

#include

#include

#define PROJID 0xFF

#define LUCY 1

#define PETER 2

int mqid;

void terminate_handler(int signo)

{

 msgctl(mqid,IPC_RMID,NULL);

 exit(0);

}

int main()

{

 char filenm[] = "msg";

 key_t mqkey;

 struct msgbuf

 {

      long mtype;      /* message type, must be > 0 */

    char mtext[256];  /* message data */

   }msg;

 int ret;

 mqkey = ftok(filenm,PROJID);

 if(mqkey == -1)

 {

  perror("ftok error: ");

  exit(-1);

 }

 mqid = msgget(mqkey,IPC_CREAT | IPC_EXCL | 0666);

 if(mqid == -1)

 {

  perror("msgget error: ");

  exit(-1);

 }

 signal(SIGINT, terminate_handler);

 signal(SIGTERM, terminate_handler);

 while(1)

 {

  printf("Lucy: ");

  fgets(msg.mtext, 256, stdin);

  if (strncmp("quit", msg.mtext, 4) == 0)

  {

   msgctl(mqid,IPC_RMID,NULL);

   exit(0);

  }

  msg.mtext[strlen(msg.mtext)-1] = '\0';

  msg.mtype = LUCY;

  msgsnd(mqid,&msg,strlen(msg.mtext) + 1,0);

  msgrcv(mqid,&msg,256,PETER,0);

  printf("Peter: %s\n", msg.mtext); 

 } 

}

下面的是msgPeter,是和Lucy通信的,程序先运行Lucy,再运行Peter

#include

#include

#include

#include

#include

#include

#include

#include

#include

#define PROJID 0xFF

#define LUCY 1

#define PETER 2

int main()

{

 char filenm[] = "msg";

 int mqid;

 key_t mqkey;

 struct msgbuf

 {

         long mtype;      /* message type, must be > 0 */

         char mtext[256];  /* message data */

   }msg;

 int ret;

 mqkey = ftok(filenm, PROJID);

 if(mqkey == -1)

 {

  perror("ftok error: ");

  exit(-1);

 }

 mqid = msgget(mqkey, 0);

 if(mqid == -1)

 {

  perror("msgget error: ");

  exit(-1);

 }

 while(1)

 {/

  msgrcv(mqid,&msg,256,LUCY,0);

  printf("Lucy: %s\n",msg.mtext);

  printf("Peter: ");

  fgets(msg.mtext,256,stdin);

  if(strncmp("quit", msg.mtext, 4) == 0)

  {

   exit(0);

  }

  msg.mtext[strlen(msg.mtext)-1] = '\0';

  msg.mtype = PETER;

  msgsnd(mqid,&msg,strlen(msg.mtext) + 1,0);

 }

}

阅读(632) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~