Chinaunix首页 | 论坛 | 博客
  • 博客访问: 121886
  • 博文数量: 83
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 50
  • 用 户 组: 普通用户
  • 注册时间: 2013-10-31 11:07
个人简介

弃我去者,昨日之日不可留; 乱我心者,今日之日多烦忧。

文章分类

分类: LINUX

2014-07-31 16:19:34

实际上,消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。

消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。

和信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。

包含文件
1、msg.c
2、msg.h
3、thread.c

源文件1 msg.c

#include    
#include    
#include    
#include    
  
#define __DEBUG   
#ifdef __DEBUG   
#define DBG(fmt,args...) fprintf(stdout,  fmt,  ##args)   
#else   
#define DBG(fmt,args...)   
#endif   
#define ERR(fmt,args...) fprintf(stderr,  fmt,  ##args)   
  
/* 
消息队列初始化 
msgKey:消息队列键值 
qid:返回值,消息队列id 
*/  
int Msg_Init( int msgKey )  
{  
    int qid;  
    key_t key = msgKey;  
    /* 
    消息队列并非私有,因此此键值的消息队列很可能在其他进程已经被创建 
    所以这里尝试打开已经被创建的消息队列 
    */  
    qid = msgget(key,0);  
    if(qid < 0){  
        /* 
        打开不成功,表明未被创建 
        现在可以按照标准方式创建消息队列 
        */  
        qid = msgget(key,IPC_CREAT|0666);  
        DBG("Create msg queue id:%d\n",qid);  
    }  
    DBG("msg queue id:%d\n",qid);  
    return qid;  
}  
/* 
杀死消息队列 
qid:消息队列id 
*/  
int Msg_Kill(int qid)  
{  
    msgctl(qid, IPC_RMID, NULL);  
    DBG("Kill queue id:%d\n",qid);  
    return 0;  


源文件2 msg.h
#ifndef _M_MSG_H_   
#define _M_MSG_H_   
  
  
/*消息队列键值*/  
#define MSG_KEY         0x12345   
  
/* 
消息类型定义 
通过消息类型接收特定组的消息 
而不是消息队列上的所以消息都接收 
*/  
enum {  
    MSG_TYPE_START = 0,  
  
    MSG_TYPE_MSG1,  
    MSG_TYPE_MSG2,  
    MSG_TYPE_MSG3,  
    MSG_TYPE_MSG4,  
    MSG_TYPE_MSG5,  
  
    MSG_TYPE_END  
};  
      
/* 
用于发送接收的消息buf 
*/  
typedef struct _MSG_BUF{  
    long Des;   //消息buf结构体必须要有的成员,且必须大于0   
    int  Src;   //自定义,发送者id   
    int  cmd;   //自定义,数据区,根据需要定义   
}MSG_BUF;  
  
int Msg_Init(int msgKey);  
int Msg_Kill(int qid);  
  
#endif 


源文件3 thread.c 根据上一篇文章修改
/* 
消息队列示例,一般来说,消息对用术语ipc中的一种, 
既然能用用于多进程通信,当然也可以用在单进程中的多线程中, 
而且,当成语复杂度提升后,多线程中使用消息队列也非常的方便 
*/  
#include    
#include    
#include    
#include    
#include    
#include    
  
#include "msg.h"   
  
#define __DEBUG   
#ifdef __DEBUG   
#define DBG(fmt,args...) fprintf(stdout,  fmt,  ##args)   
#else   
#define DBG(fmt,args...)   
#endif   
#define ERR(fmt,args...) fprintf(stderr,  fmt,  ##args)   
  
static int isThreadQuit = 0;  
int gQid;//消息队列id   
int gMyProcId = MSG_TYPE_MSG1;  
/* 
某设备写操作,不同同时访问,所以所以需要线程锁保护 
1、将函数DeviceWrite中加锁 
2、在访问DeviceWrite的线程中加锁 
以上两种方法跟据需要选择其一。 
本例中在访问的线程中加锁 
*/  
void DeviceWrite(char *str)  
{  
    /*SemWait(gHndlSem);*/  
    DBG("Device Write: %s\n",str);  
    /*SemRelease(gHndlSem);*/  
}  
void SetXxThreadQuit()  
{     
    /*quit*/  
    isThreadQuit = 1;  
}  
void *XxManageThread(void *arg)  
{  
    char *cmd = (char*)arg;  
    DBG("arg value=%s\n",cmd);  
    static int i=0;  
    while(isThreadQuit==0){  
  
        MSG_BUF msg;  
        memset(&msg,0,sizeof(MSG_BUF));  
        msg.Des = MSG_TYPE_MSG1;  
        msg.Src = gMyProcId;  
        msg.cmd = i++;  
        /* 
        参数1是队列id, 
        参数2是消息buf, 
        参数3是消息buf长度但不包含long的长度, 
        参数是4是消息发送的一些设置*/  
        msgsnd(gQid,(void*)&msg,sizeof(MSG_BUF)-sizeof(long),0);  
        DBG("msgsnd cmd = %d\n",msg.cmd);  
        sleep(1);  
          
    }  
    /*arg是将指针带进来,cmd则相反,或者设置 NULL*/  
    pthread_exit(cmd);  
    //pthread_exit(NULL);   
}  
void *XxManageThreadMutex(void *arg)  
{  
    char *cmd = (char*)arg;  
    DBG("arg value=%s\n",cmd);  
    while(isThreadQuit==0){  
          
        MSG_BUF msg;  
        memset(&msg,0,sizeof(MSG_BUF));  
        /* 
        参数1参数2参数3参数5与msgsnd相同, 
        参数4是指定消息接收的范围,只有指定的消息type才会接收 
        */  
        int ret = msgrcv(gQid,(void*)&msg,sizeof(MSG_BUF)-sizeof(long),gMyProcId,0);  
        if(ret < 0){  
            ERR("Receive msg fail!!\n");  
            break;  
        }  
        DBG("msgrcv cmd = %d\n",msg.cmd);  
        sleep(1);  
          
    }  
    /*arg是将指针带进来,cmd则相反,或者设置 NULL*/  
    pthread_exit(cmd);  
    //pthread_exit(NULL);   
}  
  
int XxManageThreadInit()  
{  
    pthread_t tManageThread;  
    pthread_t tManageThreadMutex;  
      
    char *any="any value";  
    char *retn;  
    int ret;  
    /* 
      第二个参数是设置线程属性,一般很少用到(设置优先级等),第四个参数为传递到线程的指针, 
      可以为任何类型 
    */  
    ret = pthread_create(&tManageThread,NULL,XxManageThread,"1 thread");  
    if(ret == -1){  
        /*成功返回0.失败返回-1*/  
        ERR("Ctreate Thread ERROR\n");  
        return -1;  
    }  
  
    ret = pthread_create(&tManageThreadMutex,NULL,XxManageThreadMutex,"2 thread");  
    if(ret == -1){  
        /*成功返回0.失败返回-1*/  
        ERR("Ctreate Thread ERROR\n");  
        return -1;  
    }  
      
    /* 
      设置线程退出时资源的清理方式,如果是detach,退出时会自动清理 
      如果是join,则要等待pthread_join调用时才会清理 
    */  
    pthread_detach(tManageThread);  
    pthread_detach(tManageThreadMutex);  
    //pthread_join(tManageThread,retn);   
    //DBG("retn value=%s\n",retn);   
    return 0;  
}  
  
#define TEST_MAIN   
#ifdef TEST_MAIN   
int main()  
{  
    printf("hello liuyu\n");  
    int count=3;  
  
    /*初始化消息队列*/  
    gQid = Msg_Init(MSG_KEY);  
    if(XxManageThreadInit()==-1){  
        exit(1);  
    }  
      
    while(count--){  
        DBG("[0] main running\n");  
        sleep(2);  
    }  
      
    SetXxThreadQuit();  
    /*等待线程结束*/  
    sleep(1);  
  
    /*清理消息队列*/  
    Msg_Kill(gQid);  
    DBG("waitting thread exit...\n");  
    return 0;  
}  
#endif 


运行结果:

[root@localhost src]# ./a.out   
hello liuyu  
Create msg queue id:65536  
msg queue id:65536  
[0] main running  
arg value=1 thread  
msgsnd cmd = 0  
arg value=2 thread  
msgrcv cmd = 0  
msgsnd cmd = 1  
msgrcv cmd = 1  
[0] main running  
msgsnd cmd = 2  
msgrcv cmd = 2  
msgsnd cmd = 3  
msgrcv cmd = 3  
[0] main running  
msgsnd cmd = 4  
msgrcv cmd = 4  
msgsnd cmd = 5  
msgrcv cmd = 5  
Kill queue id:65536  
waitting thread exit... 

 

 

 

阅读(832) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~