Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1501237
  • 博文数量: 218
  • 博客积分: 6394
  • 博客等级: 准将
  • 技术积分: 2563
  • 用 户 组: 普通用户
  • 注册时间: 2008-02-08 15:33
个人简介

持之以恒

文章分类

全部博文(218)

文章存档

2013年(8)

2012年(2)

2011年(21)

2010年(55)

2009年(116)

2008年(16)

分类: 网络与安全

2010-01-10 20:03:11

1.线程安全的消息排队机制

该例子演示ACE Condition包装和ACE Mutex包装的使用。代码从Message_Queue类中摘录,该类包含在Task类中。Message_Queue可被同步策略类型参数化,以获取所期望的并发控制级。缺省地,并发控制级是线程安全,如ACE Synch.h文件中的MT_Synch类所定义的:

class MT_Synch
{
public:
typedef Condition<Mutex> CONDITION;
typedef Mutex MUTEX;
};

如果MT_Synch被用于实例化Message_Queue,所有的公共方法都将是线程安全的,但同时也带来相应的开销。相反,如果Null_Synch类用于实例化Message_Queue,所有公共方法都不是线程安全的,同时也就没有额外的开销。Null_Synch也在Synch.h中定义,如下所示:

class Null_Synch
{
public:
typedef Null_Condition<Null_Mutex> CONDITION;
typedef Null_Mutex MUTEX;
};

ACE Message_Queue由一或多个通过prev_next_指针链接在一起的Message_Block组成。这样的结构可以高效地操作任意大的消息,而不会导致巨大的内存拷贝开销。 

Message_Queue是一种线程安全的消息排队机制。注意C++“traits”习语的使用将ConditionMutex类型合并进了单个模板参数。

template <class SYNCH = MT_Synch>
class Message_Queue
{
public:
    // Default high and low water marks.
    enum
    {
        // 0 is the low water mark.
        DEFAULT_LWM = 0,
        // 1 K is the high water mark.
        DEFAULT_HWM = 4096,
         // Message queue was active before activate() or deactivate().
        WAS_ACTIVE = 1,
        // Message queue was inactive before activate() or deactivate().
        WAS_INACTIVE = 2
    };


    // Initialize a Message_Queue.(构造函数)
    Message_Queue (size_t hwm = DEFAULT_HWM, size_t lwm = DEFAULT_LWM);
    // Destroy a Message_Queue.
    ~Message_Queue (void);(析构函数)


    /* Checks if queue is full/empty. */
    int is_full (void) const;
    int is_empty (void) const;


    // Enqueue and dequeue a Message_Block *.
    int enqueue_tail (Message_Block *new_item,Time_Value *tv = 0);
    int enqueue_head (Message_Block *new_item,Time_Value *tv = 0);
    int dequeue_head (Message_Block *&first_item, Time_Value *tv = 0);


    // Deactivate the queue and wakeup all threads waiting on the queue so they can continue.
    int deactivate (void);
    // Reactivate the queue so that threads can enqueue and dequeue messages again.
    int activate (void);


private:
    // Routines that actually do the enqueueing and dequeueing (assumes locks are held).
    int enqueue_tail_i (Message_Block *);
    int enqueue_head_i (Message_Block *);
    int enqueue_head_i (Message_Block *&);

    // Check the boundary conditions.
    int is_empty_i (void) const;
    int is_full_i (void) const;
 
    // Implement activate() and deactivate() methods (assumes locks are held).
    int deactivate_i (void);
    int activate_i (void);


    //数据成员

    // Pointer to head of Message_Block list.
    Message_Block *head_;
 
    // Pointer to tail of Message_Block list.
    Message_Block *tail_;
 
    // Lowest number before unblocking occurs.
    int low_water_mark_;
 
    // Greatest number of bytes before blocking.
    int high_water_mark_;
 
    // Current number of bytes in the queue.
    int cur_bytes_;
 
    // Current number of messages in the queue.
    int cur_count_;
 
    // Indicates that the queue is inactive.
    int deactivated_;
 
    // C++ wrapper synchronization primitives for controlling concurrent access.

    SYNCH::MUTEX lock_;
    SYNCH::CONDITION notempty_;
    SYNCH::CONDITION notfull_;
};


Message_Queue类的实现显示如下。Message_Queue的构造器创建一个空的消息列表,并初始化Condition对象。注意Mutexlock_被它的缺省构造器自动创建

template <class SYNCH>
Message_Queue::Message_Queue (size_t hwm,size_t lwm)
: notfull_ (lock_), notempty_ (lock_)
{
// ...
}

下面的代码检查队列是否为(也就是,没有消息在其中)或(也就是,在其中的字节的数目多于high_water_mark)。公共方法藉此来获取锁,而私有方法假定锁已被持有

template <class SYNCH> int
Message_Queue<SYNCH>::is_empty_i (void) const
{
    return cur_bytes_ <= 0 && cur_count_ <= 0;
}
 
template <class SYNCH> int
Message_Queue<SYNCH>::is_full_i (void) const
{
    return cur_bytes_ > high_water_mark_;
}
 
template <class SYNCH> int
Message_Queue<SYNCH>::is_empty (void) const
{
    Guard monitor (lock_);
    return is_empty_i ();
}
 
template <class SYNCH> int
Message_Queue<SYNCH>::is_full (void) const
{
    Guard monitor (lock_);
    return is_full_i();
}

下面的方法用于启用和停用Message_Queuedeactivate方法停用队列,并唤醒所有等待在该队列上的线程,以使它们继续。没有消息被从队列中移除。其他任何被调用的方法都立即返回-1,且errno == ESHUTDOWN,直到队列被再次激活。这些信息允许调用者检测状态的变化。

template <class SYNCH> int
Message_Queue<SYNCH>::deactivate (void)
{
    Guard m (lock_);
    return deactivate_i ();
}
 
template <class SYNCH> int
Message_Queue<SYNCH>::deactivate_i (void)
{
    int current_status = deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
     // Wake up all the waiters.==>使他们不在等待,直接放弃,呵呵
    notempty_.broadcast();
    notfull_.broadcast();
    deactivated_ = 1;
    return current_status;
}

activate方法重新启用队列,以使线程能够再次让消息入队或出队。如果队列在调用前是不活动的,该方法返回WAS_INACTIVE;如果队列在调用前是活动的,就返回WAS_ACTIVE

template <class SYNCH> int
Message_Queue<SYNCH>::activate (void)
{
    Guard m (lock_);
    return activate_i ();
}
template <class SYNCH> int
Message_Queue<SYNCH>::activate_i (void)
{
    int current_status = deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
    deactivated_ = 0;
    return current_status;
}

enqueue_head方法在队列的前面插入一个新条目。像其他的入队和出队方法一样,如果tv参数为NULL,调用者将阻塞直到可以继续执行。否则,调用者将阻塞,等待*tv所指定的数量的时间。但是当队列被关闭、有信号发生,或tv中指定的时间过去了,阻塞的调用都会返回,且errno == EWOULDBLOCK

template <class SYNCH> int
Message_Queue<SYNCH>::enqueue_head
(Message_Block *new_item, Time_Value *tv)
{
    Guard monitor(lock_);
   if (deactivated_)//消息队列处在没有激活的状态
    {
        errno = ESHUTDOWN;
        return -1;
    }
   // Wait while the queue is full.
    while (is_full_i ())
    {
    // Release the lock_ and wait for timeout, signal, or space becoming available in the list.

        if (notfull_.wait (tv) == -1)
        {
            if (errno == ETIME)
                errno = EWOULDBLOCK;
            return -1;
        }
        if (deactivated_)
        {
            errno = ESHUTDOWN;
            return -1;
        }
    }
    // Actually enqueue the message at the head of the list.
    enqueue_head_i (new_item);
    // Tell any blocked threads that the queue has a new item!
    notempty_.signal ();
    return 0;
}

enqueue_tail方法在队列的末尾插入新条目。它返回的是队列中条目的数量。

template <class SYNCH> int
Message_Queue<SYNCH>::enqueue_tail
(Message_Block *new_item, Time_Value *tv)
{
    Guard<SYNCH::MUTEX> monitor (lock_);
    if (deactivated_)
    {
        errno = ESHUTDOWN;
        return -1;
    }
   // Wait while the queue is full.
    while (is_full_i ())
    {
        // Release the lock_ and wait for timeout, signal, or space becoming available in the list.
        if (notfull_.wait (tv) == -1)
        {
            if (errno == ETIME)
                errno = EWOULDBLOCK;
            return -1;
        }
        if (deactivated_)
        {
            errno = ESHUTDOWN;
            return -1;
        }
    }
    // Actually enqueue the message at the end of the list.

    enqueue_tail_i (new_item);
    // Tell any blocked threads that the queue has a new item!
    notempty_.signal ();
    return 0;
}

dequeue_head方法移除队列中最前面的条目,并将它传回给调用者。该方法返回队列中所余条目的数目。

template <class SYNCH> int
Message_Queue<SYNCH>::dequeue_head
(Message_Block *&first_item, Time_Value *tv)
{
    Guard monitor (lock);
    // Wait while the queue is empty.
    while (is_empty_i ())
    {
    // Release the lock_ and wait for timeout, signal, or a new message being placed in the list.
        if (notempty_.wait (tv) == -1)
        {
            if (errno == ETIME)
                errno = EWOULDBLOCK;
            return -1;
        }
        if (deactivated_)
        {
            errno = ESHUTDOWN;
            return -1;
        }
    }
    // Actually dequeue the first message.
    dequeue_head_i (first_item);
   // Tell any blocked threads that the queue is no longer full.
    notfull_.signal ();
   return 0;
}

2.使用消息队列(ACE_Message_Queue)实现的生产者和消费者模型

ACE 中的每个任务(ACE_Task)都有一个底层消息队列(ACE_Message_Queue)。这个消息队列被用作任务间通信的一种方法。当一个任务想要与另一任务谈话时,它创建一个消息,并将此消息放入它想要与之谈话的任务的消息队列。接收任务通常用 getq()从消息队列里获取消息。如果队列中没有数据可用,它就进入休眠状态。如果有其他任务将消息插入它的队列,它就会苏醒过来,从队列中拾取数据并处理它。因而,在这种情况下,接收任务将从发送任务那里接收消息,并以应用特定的方式作出反馈。(上面所说的前提是在ACE_MT_SYNCH)如果是ACE_Null_Synch就没有这种相应的唤醒机制了!!

经典的生产者-消费者问题的实现。生产者任务生成数据,将它发送给消费者任务。消费者任务随后消费这个数据。使用 ACE_Task 构造,我们可将生产者和消费者看作是不同的 ACE_Task 类型的对象。这两种任务使用底层消息队列进行通信。

#pragma once
#pragma comment(lib,"ACEd.lib")
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Message_Block.h"
//The Consumer Task.
class Consumer: public ACE_Task<ACE_MT_SYNCH>
{
public:
    int open(void*)
    {
        ACE_DEBUG((LM_DEBUG, "(%t) Consumer Task opened \n"));
        //Activate the Task
        activate(THR_NEW_LWP,1);
        return 0;
    }
    //The Service Processing routine
    int svc(void)
    {
        //Get ready to receive message from Producer
        ACE_Message_Block * mb =0;
        do
        {
            if (mb != 0)
            {
                mb->release();//消费
                mb = 0;
            }
            //Get message from underlying queue
            getq(mb);
            ACE_DEBUG((LM_DEBUG,"(%t) Got message: %d from remote task\n",*mb->rd_ptr()));
        }while(*mb->rd_ptr()<5);
        if (mb != 0)
        {
            mb->release();//消费
            mb = 0;
        }
        return 0;
    }
    int close(u_long)
    {
        ACE_DEBUG((LM_DEBUG,"Consumer closes down \n"));
        return 0;
    }
};

class Producer:public ACE_Task<ACE_MT_SYNCH>
{
public:
    Producer(Consumer * consumer):consumer_(consumer), data_(0)
    {
        mb_= new ACE_Message_Block((char*)&data_,sizeof(data_));//生产
    }
    int open(void*)
    {
        ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));
        //Activate the Task
        activate(THR_NEW_LWP,1);
        return 0;
     }
     //The Service Processing routine
     int svc(void)
     {
         while(data_<6)
         {
             //Send message to consumer
             ACE_DEBUG((LM_DEBUG, "(%t)Sending message: %d to remote task\n",data_));
             consumer_->putq(mb_);
             //Go to sleep for a sec.
             ACE_OS::sleep(1);
             data_++;
             mb_= new ACE_Message_Block((char*)&data_,sizeof(data_));//生产
         }
         return 0;
     }
     int close(u_long)
     {
         ACE_DEBUG((LM_DEBUG,"Producer closes down \n"));
         return 0;
     }

private:
    char data_;
    Consumer * consumer_;
    ACE_Message_Block * mb_;
};

int main(int argc, char * argv[])
{
    Consumer *consumer = new Consumer();
    Producer * producer = new Producer(consumer);

    producer->open(0);
    consumer->open(0);
    //Wait for all the tasks to exit.
    ACE_Thread_Manager::instance()->wait();
    
    getchar();
    return 0;
}

程序的运行结果是:

参考文档:
《ACE自适配通信环境中文技术文档》

http://xujingli88.blog.163.com/blog/static/4117861920098212402275/

阅读(3681) | 评论(0) | 转发(0) |
0

上一篇:ACE线程封装组件

下一篇:主动对象模式

给主人留下些什么吧!~~