Chinaunix首页 | 论坛 | 博客
  • 博客访问: 32927
  • 博文数量: 6
  • 博客积分: 270
  • 博客等级: 二等列兵
  • 技术积分: 80
  • 用 户 组: 普通用户
  • 注册时间: 2007-10-29 14:48
文章分类

全部博文(6)

文章存档

2011年(1)

2008年(5)

我的朋友
最近访客

分类: C/C++

2008-06-03 18:22:12

    在已故巨匠Stevens先生中的<<网络编程卷2>>中有记载使用互斥锁和条件变量来解决生产者/消费者的方法,在多线程编程中,我们也常常需要解决这样的生产者消费者问题。在实际项目中,我见到过很多种解决类似问题的同步消息队列,有的复杂而优雅,有得简陋而实用。
    对于生产者,如果不考虑内存和队列的大小问题,只需要一直往消息队列里推消息就可以了。而对于消费者就要复杂一点了,在消息队列取空后,消费者可以循环轮询队列,直到取到新的消息,这种方式编码简单,但是浪费CPU时间片; 更加优雅的方法是当消息队列为空时,将消费者挂起,等到有消息可读时再唤醒。
    这里引用Stevens先生的方法,使用互斥锁和信号量实现了一个简单的同步消息队列模型。
 

#include <deque>
#include <sys/types.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>

using namespace std;

template<class DataType>
class Message_Queue
{
public:
    //构造函数,初始化2个互斥锁和1个条件变量

    Message_Queue():_nready(0)
    {
     pthread_mutex_init(&_mutex, NULL);
     pthread_mutex_init(&_ready_mutex, NULL);
     pthread_cond_init(&_cond, NULL);
    }

    //推消息

    int push_msg(DataType &d)
    {
        //加锁_mutex,向queue里推消息

        pthread_mutex_lock(&_mutex);
        _queue.push_back(d);
        pthread_mutex_unlock(&_mutex);

        //加锁_ready_mutex,判断是否需要唤醒挂起的消费者

        pthread_mutex_lock(&_ready_mutex);
        if (!_nready)
            pthread_cond_signal(&_cond); //唤醒阻塞在此消息队列上的消费者

        _nready++; //计数器++

        pthread_mutex_unlock(&_ready_mutex);
        
        return 0;        
    }
    //取消息

    int get_msg(DataType &d)
    {
        //加锁,查看计数器,看是否需要挂起

        pthread_mutex_lock(&_ready_mutex);
        while (_nready == 0) //计数器为0,队列为空,挂起
              pthread_cond_wait(&_cond, &_ready_mutex); //为空时,所有消费者会阻塞在此

        //当被生产者唤醒时,消费者重新加_ready_mutex锁,_nready > 0, 程序跳出while(_nready)循环继续运行

        _nready--;
        pthread_mutex_unlock(&_ready_mutex);
        
        //加锁,取队列操作

        pthread_mutex_lock(&_mutex);
        d = _queue.front();
        _queue.pop_front();
        pthread_mutex_unlock(&_mutex);

        return 0;
    }

private:
    pthread_cond_t     _cond;
    int                 _nready;
    pthread_mutex_t     _ready_mutex;
    pthread_mutex_t     _mutex;
    deque<DataType> _queue;
};


//测试程序,模拟N个生产者和M个消费者使用消息队列同时工作的情况。

void *consume(void *arg) //消费者线程

{
    Message_Queue<int> *queue = (Message_Queue<int> *)arg;
    int    i;
    for (; ; )
    {
        sleep(1);
        printf("[%u]ready to get_msg\n", pthread_self());
        queue->get_msg(i);
        printf("[%u]get msg = %d\n", pthread_self(), i);
    }

}
void *produce(void *arg) //生产者线程

{
    Message_Queue<int> *queue = (Message_Queue<int> *)arg;
    int    i;
    for (; ; )
    {
        sleep(1);
        printf("[%u]ready to push_msg\n", pthread_self());
        queue->push_msg(i);
        printf("[%u]push_msg finished\n", pthread_self());
        i++;
    }
}

int main()
{

    Message_Queue<int> msg_queue;

    int    n ;
    int c = 2;
    int p = 3;
    pthread_t    k;
    printf("create %d consume................\n", c);
    for (n=0; n< c; n++)
    {
        pthread_create(&k, NULL, consume, &msg_queue);
    }
    printf("create %d produce.............\n", p);
    for (n=0; n< p; n++)
    {
        pthread_create(&k, NULL, produce, &msg_queue);
    }

    pause();
}

    这样,我们对STL的deque 进行一个简单的封装,就获得了一个可用的同步消息队列。这里需要提及的是,一次pthread_cond_signal操作会唤醒所有阻塞在此条件变量上的消费者,而所有阻塞在pthread_cond_wait上的消费者被唤醒,这就是所谓“惊群”。而被唤醒的所有消费者都会去竞争_ready_mutex锁,这就是所谓“二次竞争”。这样的“惊群”和“二次竞争”在各个平台上是否存在,对性能的影响有多大,我没有验证过,只是一种猜测,不过相信这样的模型满足一般的应用程序应该没有问题。

阅读(1413) | 评论(2) | 转发(0) |
给主人留下些什么吧!~~

chinaunix网友2010-06-30 17:45:44

瞄了一眼,应该是个死锁程序。。。。

chinaunix网友2008-06-21 17:56:38

写的好....