分类: C/C++
2010-03-11 18:02:10
ACE提供了一个类用于创建基本的线程,这个类就是ACE_Task_Base;它的使用方法为:
STEP1、创建ACE_Task_Base类的子类,并重新定义svc()虚方法的实现;svc()方法充当的是你的新线程的进入点,也就是说,你的线程将在svc()虚方法中开始执行,当svc()虚方法返回时结束,其方式与主线程类似;换句话说,你的新线程将开始于svc()虚方法的执行,结束于svc()虚方法的返回;
STEP2、创建ACE_Task_Base类的子类的对象,并在这个对象上调用active()方法以启动这个线程对象;
线程的同步原语:
使用互斥体的同步原语:
ACE提供了若干个互斥体类,用于线程的同步原语操作;ACE_Mutex可用作轻量级的线程同步原语,也可用作重量级的跨进程同步原语;ACE_Thread_Mutex类用于多线程访问共享资源时确保一致性约束;
使用守卫得同步原语:
ACE提供的守卫类和宏,可以帮助我们简化代码,防止死锁代码的发生,减少死锁的机会;守卫(guard)基于一种常见的C++惯用手法:把类的构造函数和析够函数分别用于资源获取和释放;在进入某个作用域的时候可以构造一个类的对象,而在离开这个作用域的时候,这个类的析够函数会被自动调用;这样就可以自动获取和释放资源;守卫类在构造时获取一个指定的锁,在销毁时自动释放这个锁.在栈上使用守卫,你总能够保证锁的释放,而不管你走的是什么样的非正常路径;
ACE提供的守卫类:
ACE_Guard
ACE_Read_Guard
ACE_Write_Guard
ACE_TSS_Guard
ACE_TSS_Read_Guard
ACE_TSS_Write_Guard
ACE提供的没有返回值的守卫宏:
ACE_GUARD(LockType, GuardName, LockObject);
ACE_WRITE_GUARD(LockType, GuardName, LockObject);
ACE_READ_GUARD(LockType, GuardName, LockObject);
ACE提供的带有出错返回值的守卫宏:
ACE_GUARD_RETURN(LockType, GuardName, LockObject, ReturnValue);
ACE_WRITE_GUARD_RETURN(LockType, GuardName, LockObject, ReturnValue);
ACE_READ_GUARD_RETURN(LockType, GuardName, LockObject, ReturnValue);
任务间通信:
一般而言,任务间通信可以划分为两个范畴:
A、状态变化或事件通知:
在这种情况下,只需要对事件的发生进行交流,而不需要在两个线程之间传递数据;
B、消息传递:
在这种情况下,两个线程之间需要相互传递数据,可能会形成这样一个工作链:第一个线程对数据进行处理,然后把它传递给下一个线程,以进行进一步的处理;
使用条件变量进行同步:
线程可以使用条件变量来把状态的变化、事件的到达或另外条件的满足传达给其它感兴趣的线程;条件变量总是要与互斥体协同使用.这些条件变量还有一个特殊的特征:你可以在其上进行定时的阻塞;所以,使用条件变量来管理简单的事件循环非常容易;
要使用条件变量,就必须先获取互斥体,检查系统是否处于所需状态中----所需条件为真,如果是这样,那么就执行所需的动作,然后释放互斥体.如果条件没有处于所需状态中,你就必须调用条件变量的wait()方法,等待系统状态发生变化.一旦系统状态发生了变化,那么造成该变化的线程会向条件变量发出信号,唤醒正阻塞在这个条件变量上的一个或多个线程;等待中的线程苏醒过来,再次检查系统状态,如果系统状态能够通过检查,就执行所需动作,否则,它们再次进行等待;
注意:阻塞的线程在调用wait()方法进入休眠时并没有释放互斥体,在苏醒时也没有尝试获取互斥体.其原因是:条件变量会确保在进入休眠的时候自动释放互斥体,在苏醒之前自动获取互斥体;也就是说,条件变量的等待是一个无阻塞等待;也就是说,wait()方法开始执行的时候会先释放互斥体,在返回的时候重新获取互斥体;
ACE提供的ACE_Condition
ACE_Thread_Mutex mtxThread;
ACE_Condition
消息传递:
消息常被用来在线程之间交流数据和时间的发生.发送者创建一个消息,放入消息队列,供接收者拾取.接收者或者阻塞,或者轮询队列,等待新数据到达;一旦数据入队,接收者就从队列中取出数据,使用它,然后再回去在队列上等待新数据;
队列充当的是两个线程之间的共享资源,因而入队和出队操作应该受到保护;如果队列能够支持阻塞式调用,在新数据到达时解除阻塞,那将会很方便;
ACE提供了一个区别于ACE_Task_Base类的另外一个设施,可以对派生自ACE_Task<>模板的线程间的消息进行排队;从ACE_Task类派生你的类,那么你就自动继承了一个ACE_Message_Queue类型的消息队列,你可以把它用在你的新类中;ACE_Message_Queue类是仿照System V STREAM提供的排队设施设计的;
消息队列提供了一个类型安全的接口,允许你把ACE_Message_Block的实例放入到队列中;
消息块:
ACE_Message_Block类是一种高效的数据容器,可以用来高效地存储和共享消息;你可以把它看成是一种高级的数据缓冲区,支持像引用计数和数据共享这样的特性;每个消息块都包含两个指针:rd_ptr()指向要读取的下一个字节; wr_ptr()指向下一个可用的空字节;
关于ACE_Task_Base类的例子:
#include "ace/Log_Msg.h"
#include "ace/OS.h"
#include "ace/Thread.h"
#include "ace/Task.h"
class CommandHandler: public ACE_Task_Base
{
private:
static int nThreadNum;
public:
virtual int svc(void)
{
int nThreadID = 0;
int i = 0;
nThreadNum++;
nThreadID = nThreadNum;
ACE_DEBUG( (LM_INFO, ACE_TEXT("thread[%t] %d start\n"), nThreadID) );
ACE_OS::sleep(1);
while(i <= 100)
{
ACE_DEBUG( (LM_INFO, ACE_TEXT("thread[%t] %d print %d\n"), nThreadID, i) );
i++;
ACE_OS::sleep(1);
}
return nThreadID;
}
};
int CommandHandler::nThreadNum = 0;
int ACE_TMAIN(int argc, ACE_TCHAR** argv)
{
ACE_DEBUG( (LM_INFO, ACE_TEXT("Test Thread from ACE_Task_Base is running ...\n")) );
int nReturn = 0;
CommandHandler cmd;
nReturn = cmd.activate(/*THR_NEW_LWP|THR_JOINABLE, 2*/);
if(nReturn != 0)
{
ACE_DEBUG( (LM_ERROR, ACE_TEXT("failed to active this thread[%d] %p\n"), nReturn, ACE_TEXT("active")) );
return -1;
}
ACE_OS::sleep(10);
ACE_DEBUG( (LM_INFO, ACE_TEXT("Suspend this thread ......\n")) );
cmd.suspend();
ACE_OS::sleep(10);
ACE_DEBUG( (LM_INFO, ACE_TEXT("Resume this thread ......\n")) );
cmd.resume();
cmd.wait();
return 0;
}
关于ACE_Task<>类模板的例子:
#include "ace/Log_Msg.h"
#include "ace/OS.h"
#include "ace/Thread.h"
#include "ace/Task.h"
class CommandHandler: public ACE_Task
{
private:
static int nThreadNum;
public:
virtual int svc(void)
{
int nThreadID = 0;
int i = 0;
nThreadNum++;
nThreadID = nThreadNum;
ACE_DEBUG( (LM_INFO, ACE_TEXT("thread[%t] %d start\n"), nThreadID) );
ACE_OS::sleep(1);
while(i <= 100)
{
ACE_DEBUG( (LM_INFO, ACE_TEXT("thread[%t] %d print %d\n"), nThreadID, i) );
i++;
ACE_OS::sleep(1);
}
return nThreadID;
}
};
int CommandHandler::nThreadNum = 0;
int ACE_TMAIN(int argc, ACE_TCHAR** argv)
{
ACE_DEBUG( (LM_INFO, ACE_TEXT("Test Thread from ACE_Task_Base is running ...\n")) );
int nReturn = 0;
CommandHandler cmd;
nReturn = cmd.activate(/*THR_NEW_LWP|THR_JOINABLE, 2*/);
if(nReturn != 0)
{
ACE_DEBUG( (LM_ERROR, ACE_TEXT("failed to active this thread[%d] %p\n"), nReturn, ACE_TEXT("active")) );
return -1;
}
ACE_OS::sleep(10);
ACE_DEBUG( (LM_INFO, ACE_TEXT("Suspend this thread ......\n")) );
cmd.suspend();
ACE_OS::sleep(10);
ACE_DEBUG( (LM_INFO, ACE_TEXT("Resume this thread ......\n")) );
cmd.resume();
cmd.wait();
return 0;
}