全部博文(177)
分类: C/C++
2008-07-13 19:43:06
异步多任务处理
在某些系统中,需要你自己来构建自己的异步多任务处理框架。通过这种多任务处理,可以自己实现事件驱动编程模型的一个小部分。当然,event driven is good for computer but not for human beings,人要是事件驱动,要么他是消防队员,要么他很失败,呵呵。另外,当存在典型的情况 “等待数据->处理数据->等待数据”循环时,用异步处理能够大大增加系统的性能,尤其是出现数据和处理互相依赖的关系时。如:任务A的输出数据是任务B的输入,而B的输出又是C的输入,而且B和C在进行数据处理之前需要花费大量时间来做初始化,线性处理显然会有性能瓶颈。采用异步处理能改善这种情况,当有多个处理器时性能将得到极大的提高。
本文描述的异步多任务是通过同一进程内的调度器来调度该进程内多个异步任务,这些异步任务的真正执行是通过工作线程来完成。其模型为:各异步的任务发送请求后阻塞;当其请求的任务完成后,调度器唤醒该任务并执行相应的处理(也许会再次发送一个请求)。这时一个典型的生产者/消费者的问题:在调度器中,生产者是任务,而调度器是消费者。一旦任务的请求完成,调度器则执行该任务。
由于scheduler是在一个进程中,因此需要先将scheduler安装到进程中,然后将各异步任务添加到scheduler,之后,各异步任务发送第一个请求等待完成。最后启动scheduler,处理各个已经完成请求。如下图所示:
每个任务都应当继承自同一个基类以使得调度器可以以相同的方式处理他们。每个派生的任务都应当包含(至少)一个异步操作(或所属对象),该操作可以执行1~n次。每次异步操作完成之后,其Run()函数将被调用以进行后续处理。因此,任务至少应当具有一个状态成员用于指示该任务所处的状态。当任务处理完毕之后,任务可以把自己从调度队列中移出。由于添加任务到调度队列的操作应当由调度器而不是任务本身来做,因此,不需要Add()操作。因此,任务类应当象这样:
其中:
StartWorker()、Dequeue()都是由TaskBase来实现的:
int TaskBase::Dequeue()
{
Scheduler* sched = SchedulerInstance::GetInstance();
int ret = ErrNone;
m_Mutex->Lock();
if (sched && m_Status != TaskPending)
sched->Remove(this);
else
ret = ErrTaskBusy;
m_Mutex->Unlock();
return ret;
}
int TaskBase::StartWorker()
{
Scheduler* sched = SchedulerInstance::GetInstance();
int ret = ErrNone;
m_Mutex->Lock();
if (sched && (m_Status != TaskPending || m_Status != NotAvailable))
{
m_Worker->RunThread(this);
}
else
ret = ErrTaskBusy;
m_Mutex->Unlock();
return ret;
}
void TaskBase::CancelWorker()
{
Scheduler* sched = SchedulerInstance::GetInstance();
m_Mutex->Lock();
if (sched && m_Status == TaskPending)
{
m_Worker->Cancel();
m_Status = TaskCancelled;
}
m_Mutex->Unlock();
}
由于使用了工作线程来进行真正的异步操作,因此,应当有一个静态函数来作为线程的入口函数。
其实现为:
void WorkerThread::ThreadFunction(void* aData)
{
TaskBase* task = static_cast
if (task)
task->IssueRequest();
// Notify the Scheduler
Scheduler* sched = SchedulerInstance::GetInstance();
if (sched)
sched->Notify();
}
由于工作线程的实现与平台相关,因此此处列出Linux平台的典型实现:
void LinuxThread::RunThread(void* aData)
{
pthread_create(&m_TID, NULL, WorkerThread::ThreadFunction, aData);
Scheduler* sched = SchedulerInstance::GetInstance();
if (sched)
sched->Notify();
}
void LinuxThread::Cancel()
{
pthread_cancel(m_TID);
Scheduler* sched = SchedulerInstance::GetInstance();
if (sched)
sched->Notify();
}
由于TaskBase是抽象类,因此需要从其派生出真正的Task类。例如:
首先,必须有一个(或多个)调度队列以存放所有的异步操作。为了能够使任务睡眠、异步操作完成后响应该异步时间并执行相应的操作,调度器应当拥有一个多值信号量。由于调度器与异步任务同处一个进程,且不会出现多进程/线程同时操作队列的情况。如果有这种竞态条件则需要对队列进行锁保护。此处无须锁保护。如下:
此处将其封装是为了方便平台间的移植。
调度器应当提供操作使得一个进程能够方便地添加/删除任务。当任务被添加到队列之后,进程就可以启动调度器;所有任务结束之后,或者应某任务要求而停止。然而,一旦调度器启动,则无法再向其中加入任务(在Run()中增加任务是可行的,但笔者没有对其带来的后果进行验证)。由于我们对任务进行了优先级的划分,因此需要有多个任务队列。如下:
其中,Start()是整个调度器调度任务的地方:
void Scheduler::Start()
{
for( ; ; )
{
WaitForEvent();
if (m_State == SchedStopped)
return;
for (int curPrio = 0; curPrio < MaxNumOfPrioQ; ++curPrio)
{
list
if (cur->size() > 0)
{
for
(list
{
(*iter)->RunTask();
// Task with high priority will be run first.
// Otherwise, priority policy makes no sense.
goto
}
}
}
}
}
为了停止调度器:
void Scheduler::Stop()
{
m_State = SchedStopped;
// One of the notification of task is borrowed to stop the scheduler
Notify();
}
由于每个进程只能有一个调度器,所以只能把下面的singleton暴露给API调用者:
好了,我们有了些基本的元素,可以开始构建了。
在实现了各类的抽象方法之后,典型的实现如下:
Scheduler* sched = SchedulerInstance::CreateInstance();
MyTask tasks[10];
for (int i = 0; i < 10; ++i)
{
sched->Add(&task[i]);
task[i].StartWorker();
}
sched->Start();
文中代码均为原型,不具备产品代码的健壮性。如直接拷贝代码导致问题,笔者概不负责。如果导致商业事故,笔者亦不负责并要追究其侵权行为。
由于笔者不太熟悉Windows平台的API,而且并未搭建Linux测试环境,因此,文中代码只包含平台无关部分。
StarUML和CodeBlocks让我可以得到很好的功能而无须使用盗版软件
Active Object模式
Factory Method模式
Singleton模式