MMORPG游戏服务端线程池类
线程池:
线程是一种比较昂贵的资源.有些系统为了重用线程.引入了线程池的机制.
线程池的工作原理如下:
首先.系统会启动一定数量的线程.这些线程就构成了一个线程池.当有任务要做的时候.系统就从线程池里面选一个空闲的线程.然后把这个线程标记为“正在运行”.然后把任务传给这个线程执行.线程执行任务完成之后.就把自己标记为"空闲".这个过程并不难以理解.难以理解的是.一般来说.线程执行完成之后.运行栈等系统资源就会释放.线程对象就被回收了.一个已经完成的线程.又如何能回到线程池的空闲线程队列中呢? 秘诀就在于.线程池里面的线程永远不会执行完成.线程池里面的线程都是一个无穷循环
ThreadStarter.h
#ifndef __THREADSTARTER_H__
#define __THREADSTARTER_H__
#include <windows.h>
//线程接口
class ThreadBase
{
public:
ThreadBase() {}
virtual ~ThreadBase() {}
virtual bool run() = 0;//线程函数
virtual void OnShutdown() {}
HANDLE THREAD_HANDLE;
};
#endif
Threads.h
#ifndef __CTHREADS_H__
#define __CTHREADS_H__
#include "ThreadStarter.h"
//线程的状态
enum CThreadState
{
THREADSTATE_TERMINATE = 0,//终止
THREADSTATE_PAUSED = 1,//暂停
THREADSTATE_SLEEPING = 2,//睡眠
THREADSTATE_BUSY = 3,//忙碌
THREADSTATE_AWAITING = 4,//等候
};
//线程基类
class CThread : public ThreadBase
{
public:
CThread();
~CThread();
virtual bool run();
virtual void OnShutdown();
//设置线程的状态
__forceinline void SetThreadState(CThreadState thread_state)
{
ThreadState = thread_state;
}
//返回线程的状态
__forceinline CThreadState GetThreadState()
{
return ThreadState;
}
//返回线程ID
int GetThreadId()
{
return ThreadId;
}
time_t GetStartTime()
{
return start_time;
}
protected:
CThreadState ThreadState;//线程的状态
time_t start_time;
int ThreadId;//线程ID
};
#endif
Threads.cpp
#include "stdafx.h"
#include "CThreads.h"
CThread::CThread() : ThreadBase()
{
//初试化线程的状态为等候
ThreadState = THREADSTATE_AWAITING;
start_time = 0;
}
CThread::~CThread()
{
}
bool CThread::run()
{
return false;
}
void CThread::OnShutdown()
{
SetThreadState(THREADSTATE_TERMINATE);
}
Mutex.h
#ifndef __MUTEX_H__
#define __MUTEX_H__
#include <windows.h>
//多个线程操作相同的数据时,一般是需要按顺序访问的,否则会引导数据错乱
//为解决这个问题,就需要引入互斥变量,让每个线程都按顺序地访问变量。
class Mutex
{
public:
Mutex();
~Mutex();
__forceinline void Acquire()
{
EnterCriticalSection(&cs);
}
__forceinline void Release()
{
LeaveCriticalSection(&cs);
}
/*
例如:
线程操作函数。
int AddCount(void)
{
EnterCriticalSection(&cs);
int nRet = m_nCount++;
LeaveCriticalSection(&cs);
return nRet;
}
在函数AddCount里调用EnterCriticalSection和LeaveCriticalSection来互斥访问变量m_nCount。
通过上面这种方法,就可以实现多线程按顺序地访问相同的变量
*/
__forceinline bool AttemptAcquire()
{
//一个线程也可以调用TryEnterCriticalSection函数来请求某个临界区的所有权,此时即
//使请求失败也不会被阻塞
return 0;//(TryEnterCriticalSection(&cs) == TRUE ? true : false);
}
protected:
CRITICAL_SECTION cs;//临界区是一种防止多个线程同时执行一个特定代码节的机制
};
#endif
Mutex.cpp
#include "stdafx.h"
#include "Mutex.h"
Mutex::Mutex()
{
//创建临界区对象
InitializeCriticalSection(&cs);
}
Mutex::~Mutex()
{
//删除临界区对象
DeleteCriticalSection(&cs);
}
ThreadPool.h
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include "ThreadStarter.h"
#include "Mutex.h"
#include <windows.h>
#include <assert.h>
#include <set>
typedef unsigned int uint32;
typedef signed __int32 int32;
//线程管理
class ThreadController
{
public:
HANDLE hThread;
uint32 thread_id;
void Setup(HANDLE h)
{
hThread = h;
}
void Suspend()
{
assert(GetCurrentThreadId() == thread_id);
//当线程做完任务或者现在想暂停线程运行,就需要使用SuspendThread来暂停线程的执行
SuspendThread(hThread);
}
//恢复线程的执行就是使用ResumeThread函数了
void Resume()
{
assert(GetCurrentThreadId() != thread_id);
if(!ResumeThread(hThread))
{
DWORD le = GetLastError();
printf("error: %u\n", le);
}
}
void Join()
{
//WaitForSingleObject函数用来检测hHandle事件的信号状态,当函数的执行时间超过dwMilliseconds就返回
WaitForSingleObject(hThread, INFINITE);
}
uint32 GetId()
{
return thread_id;
}
};
struct Thread
{
ThreadBase* ExecutionTarget;
ThreadController ControlInterface;
Mutex SetupMutex;//线程的互斥
bool DeleteAfterExit;
};
typedef std::set<Thread*> ThreadSet;
//线程池类
class CThreadPool
{
//uint32 _threadsRequestedSinceLastCheck;
//uint32 _threadsFreedSinceLastCheck;
//uint32 _threadsExitedSinceLastCheck;
uint32 _threadsToExit;
int32 _threadsEaten;//可用线程数量
Mutex _mutex;
ThreadSet m_activeThreads;//正在执行任务线程对列
ThreadSet m_freeThreads;//可用线程对列
public:
CThreadPool();
void IntegrityCheck();
//创建指定数量的线程并加到线程池
void Startup();
//销毁线程
void Shutdown();
bool ThreadExit(Thread * t);
Thread* StartThread(ThreadBase * ExecutionTarget);
//从线程池取得可用线程并执行任务
void ExecuteTask(ThreadBase * ExecutionTarget);
void ShowStats();
void KillFreeThreads(uint32 count);
//__forceinline void Gobble(){
// _threadsEaten=(int32)m_freeThreads.size();
//}
__forceinline uint32 GetActiveThreadCount(){
return (uint32)m_activeThreads.size();
}
__forceinline uint32 GetFreeThreadCount(){
return (uint32)m_freeThreads.size();
}
};
extern CThreadPool ThreadPool;//线程池
#endif
ThreadPool.cpp
#include "stdafx.h"
#include "ThreadPool.h"
#include <process.h>
CThreadPool ThreadPool;
CThreadPool::CThreadPool()
{
//_threadsExitedSinceLastCheck = 0;
//_threadsRequestedSinceLastCheck = 0;
_threadsEaten = 0;
//_threadsFreedSinceLastCheck = 0;
}
bool CThreadPool::ThreadExit(Thread * t)
{
_mutex.Acquire();
m_activeThreads.erase(t);
if(_threadsToExit > 0)
{
--_threadsToExit;
//++_threadsExitedSinceLastCheck;
if(t->DeleteAfterExit)
m_freeThreads.erase(t);
_mutex.Release();
delete t;
return false;
}
// enter the "suspended" pool
//++_threadsExitedSinceLastCheck;
++_threadsEaten;
std::set<Thread*>::iterator itr = m_freeThreads.find(t);
if(itr != m_freeThreads.end())
{
}
m_freeThreads.insert(t);
_mutex.Release();
return true;
}
void CThreadPool::ExecuteTask(ThreadBase * ExecutionTarget)
{
Thread * t;
_mutex.Acquire();
//++_threadsRequestedSinceLastCheck;
--_threadsEaten;
//从线程池夺取一个线程
if(m_freeThreads.size())//有可用线程
{
//得到一个可用线程
t = *m_freeThreads.begin();
//把它从可用线程对列里删掉
m_freeThreads.erase(m_freeThreads.begin());
//给这个线程一个任务
t->ExecutionTarget = ExecutionTarget;
//恢复线程的执行
t->ControlInterface.Resume();
}
else
{
//创建一个新的线程并执行任务
t = StartThread(ExecutionTarget);
}
//把线程加到执行任务线程对列
m_activeThreads.insert(t);
_mutex.Release();
}
void CThreadPool::Startup()
{
int i;
int tcount = 5;
for(i=0; i < tcount; ++i)
StartThread(NULL);
}
void CThreadPool::ShowStats()
{
_mutex.Acquire();
//在这里输出线程池的状态
//

_mutex.Release();
}
void CThreadPool::KillFreeThreads(uint32 count)
{
_mutex.Acquire();
Thread * t;
ThreadSet::iterator itr;
uint32 i;
for(i = 0, itr = m_freeThreads.begin(); i < count && itr != m_freeThreads.end(); ++i, ++itr)
{
t = *itr;
t->ExecutionTarget = NULL;
t->DeleteAfterExit = true;
++_threadsToExit;
t->ControlInterface.Resume();
}
_mutex.Release();
}
void CThreadPool::Shutdown()
{
_mutex.Acquire();
size_t tcount = m_activeThreads.size() + m_freeThreads.size();
KillFreeThreads((uint32)m_freeThreads.size());
_threadsToExit += (uint32)m_activeThreads.size();
for(ThreadSet::iterator itr = m_activeThreads.begin(); itr != m_activeThreads.end(); ++itr)
{
if((*itr)->ExecutionTarget)
(*itr)->ExecutionTarget->OnShutdown();
}
_mutex.Release();
for(;;)
{
_mutex.Acquire();
if(m_activeThreads.size() || m_freeThreads.size())
{
_mutex.Release();
Sleep(1000);
continue;
}
break;
}
}
static unsigned long WINAPI thread_proc(void* param)
{
Thread * t = (Thread*)param;
t->SetupMutex.Acquire();
uint32 tid = t->ControlInterface.GetId();
bool ht = (t->ExecutionTarget != NULL);
t->SetupMutex.Release();
for(;;)
{
if(t->ExecutionTarget != NULL)
{
if(t->ExecutionTarget->run())//执行任务,返回true表示任务完成
delete t->ExecutionTarget;
t->ExecutionTarget = NULL;
}
if(!ThreadPool.ThreadExit(t))
{
//Log.Debug("ThreadPool", "Thread %u exiting.", tid);
break;
}
else
{
//if(ht)
// printf("ThreadPool:线程%d正在等待新任务.", tid);
t->ControlInterface.Suspend();//暂停线程运行
}
}
ExitThread(0);
return 0;
}
Thread * CThreadPool::StartThread(ThreadBase * ExecutionTarget)
{
HANDLE h;
Thread * t = new Thread;
t->DeleteAfterExit = false;
t->ExecutionTarget = ExecutionTarget;
t->SetupMutex.Acquire();
/*
CreateThread(
lpThreadAttributes是线程的属性,
dwStackSize是线程的栈大小,
lpStartAddress是线程函数的开始地址,
lpParameter是传送给线程函数的参数,
dwCreationFlags是创建线程标志,比如挂起线程,
lpThreadId是标识这个线程的ID)
*/
h = CreateThread(NULL, 0, &thread_proc, (LPVOID)t, 0, (LPDWORD)&t->ControlInterface.thread_id);
t->ControlInterface.Setup(h);
t->SetupMutex.Release();
return t;
}
阅读(4249) | 评论(0) | 转发(0) |