Chinaunix首页 | 论坛 | 博客
  • 博客访问: 917780
  • 博文数量: 299
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2493
  • 用 户 组: 普通用户
  • 注册时间: 2014-03-21 10:07
个人简介

Linux后台服务器编程。

文章分类

全部博文(299)

文章存档

2015年(2)

2014年(297)

分类: C/C++

2014-08-02 09:00:07

我设计这个线程池的初衷是为了与socket对接的。线程池的实现千变万化,我得这个并不一定是最好的,但却是否和我心目中需求模型的。现把部分设计思路和代码贴出,以期抛砖引玉。个人比较喜欢搞开源,所以大家如果觉得有什么需要改善的地方,欢迎给予评论。思前想后,也没啥设计图能表达出设计思想,就把类图贴出来吧。

类图设计如下:

Command类是我们的业务类。这个类里只能存放简单的内置类型,这样方便与socket的直接传输。我定义了一个cmd_成员用于存放命令字,arg_用于存放业务的参数。这个参数可以使用分隔符来分隔各个参数。我设计的只是简单实现,如果有序列化操作了,完全不需要使用我这种方法啦。

ThreadProcess就是业务处理类,这里边定义了各个方法用于进行业务处理,它将在ThreadPool中的Process函数中调用。

ThreadPool就是我们的线程池类。其中的成员变量都是静态变量,Process就是线程处理函数。

#define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍
#define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数
#define THREAD_NUM 10 // 初始线程数

bshutdown_:用于线程退出。

command_:用于存放任务队列

command_cond_:条件变量

command_mutex_:互斥锁

icurr_thread_num_:当前线程池中的线程数

thread_id_map_:这个map用于存放线程对应的其它信息,我只存放了线程的状态,0为正常,1为退出。还可以定义其它的结构来存放更多的信息,例如存放套接字。

InitializeThreads:用于初始化线程池,先创建THREAD_NUM个线程。后期扩容也需要这个函数。

Process:线程处理函数,这里边会调用AddThread和DeleteThread在进行线程池的伸缩。

AddWork:往队列中添加一个任务。

ThreadDestroy:线程销毁函数。

AddThread:扩容THREAD_NUM个线程

DeleteThread:如果任务队列为空,则将原来的线程池恢复到THREAD_NUM个。这里可以根据需要进行修改。

 

以下贴出代码以供大家参考。

command.h

复制代码
#ifndef COMMAND_H_ #define COMMAND_H_ class Command
{ public: int get_cmd(); char* get_arg(); void set_cmd(int cmd); void set_arg(char* arg); private: int cmd_; char arg_[65];
}; #endif /* COMMAND_H_ */
复制代码

command.cpp

复制代码
#include <string.h> #include "command.h" int Command::get_cmd()
{ return cmd_;
} char* Command::get_arg()
{ return arg_;
} void Command::set_cmd(int cmd)
{
    cmd_ = cmd;
} void Command::set_arg(char* arg)
{ if(NULL == arg)
    { return;
    }
    strncpy(arg_,arg,64);
    arg_[64] = '\0';
}
复制代码

thread_process.h

复制代码
#ifndef THREAD_PROCESS_H_ #define THREAD_PROCESS_H_ class ThreadProcess
{ public: void Process0(void* arg); void Process1(void* arg); void Process2(void* arg);
}; #endif /* THREAD_PROCESS_H_ */
复制代码

thread_process.cpp

复制代码
#include  #include  #include  #include "thread_process.h" void ThreadProcess::Process0(void* arg)
{
    printf("thread %u is starting process %s\n",pthread_self(),arg);
    usleep(100*1000);
} void ThreadProcess::Process1(void* arg)
{
    printf("thread %u is starting process %s\n",pthread_self(),arg);
    usleep(100*1000);
} void ThreadProcess::Process2(void* arg)
{
    printf("thread %u is starting process %s\n",pthread_self(),arg);
    usleep(100*1000);
}
复制代码

thread_pool.h

复制代码
#ifndef THREAD_POOL_H_ #define THREAD_POOL_H_ #include  #include  #include "command.h" #define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍 #define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数 #define THREAD_NUM 10 // 初始线程数 class ThreadPool
{ public:
    ThreadPool() {}; static void InitializeThreads(); void AddWork(Command command); void ThreadDestroy(int iwait = 2); private: static void* Process(void* arg); static void AddThread(); static void DeleteThread(); static bool bshutdown_; static int icurr_thread_num_; static std::mapint> thread_id_map_; static std::vector command_; static pthread_mutex_t command_mutex_; static pthread_cond_t command_cond_;
}; #endif /* THREAD_POOL_H_ */
复制代码

thread_pool.cpp

复制代码
#include  #include  #include "thread_pool.h" #include "thread_process.h" #include "command.h" bool ThreadPool::bshutdown_ = false; int ThreadPool::icurr_thread_num_ = THREAD_NUM;
std::vector ThreadPool::command_;
std::mapint> ThreadPool::thread_id_map_;
pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER; void ThreadPool::InitializeThreads()
{ for (int i = 0; i < THREAD_NUM ; ++i)
    {
        pthread_t tempThread;
        pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);
        thread_id_map_[tempThread] = 0;
    }
} void* ThreadPool::Process(void* arg)
{
    ThreadProcess threadprocess;
    Command command; while (true)
    {
        pthread_mutex_lock(&command_mutex_); // 如果线程需要退出,则此时退出 if (1 == thread_id_map_[pthread_self()])
        {
            pthread_mutex_unlock(&command_mutex_);
            printf("thread %u will exit\n", pthread_self());
            pthread_exit(NULL);
        } // 当线程不需要退出且没有需要处理的任务时,需要缩容的则缩容,不需要的则等待信号 if (0 == command_.size() && !bshutdown_)
        { if(icurr_thread_num_ >  THREAD_NUM)
            {
                DeleteThread(); if (1 == thread_id_map_[pthread_self()])
                {
                    pthread_mutex_unlock(&command_mutex_);
                    printf("thread %u will exit\n", pthread_self());
                    pthread_exit(NULL);
                }
            }
            pthread_cond_wait(&command_cond_,&command_mutex_);
        } // 线程池需要关闭,关闭已有的锁,线程退出 if(bshutdown_)
        {
            pthread_mutex_unlock (&command_mutex_);
            printf ("thread %u will exit\n", pthread_self ());
            pthread_exit (NULL);
        } // 如果线程池的最大线程数不等于初始线程数,则表明需要扩容 if(icurr_thread_num_ < command_.size()))
        {
            AddThread();
        } // 从容器中取出待办任务 std::vector::iterator iter = command_.begin();
        command.set_arg(iter->get_arg());
        command.set_cmd(iter->get_cmd());
        command_.erase(iter);
        pthread_mutex_unlock(&command_mutex_); // 开始业务处理 switch(command.get_cmd())
        { case 0:
            threadprocess.Process0(command.get_arg()); break; case 1:
            threadprocess.Process1(command.get_arg()); break; case 2:
            threadprocess.Process2(command.get_arg()); break; default: break;
        }
    } return NULL; // 完全为了消除警告(eclipse编写的代码,警告很烦人) } void ThreadPool::AddWork(Command command)
{ bool bsignal = false;
    pthread_mutex_lock(&command_mutex_); if (0 == command_.size())
    {
        bsignal = true;
    }
    command_.push_back(command);
    pthread_mutex_unlock(&command_mutex_); if (bsignal)
    {
        pthread_cond_signal(&command_cond_);
    }
} void ThreadPool::ThreadDestroy(int iwait)
{ while(0 != command_.size())
    {
        sleep(abs(iwait));
    }
    bshutdown_ = true;
    pthread_cond_broadcast(&command_cond_);
    std::mapint>::iterator iter = thread_id_map_.begin(); for (; iter!=thread_id_map_.end(); ++iter)
    {
        pthread_join(iter->first,NULL);
    }
    pthread_mutex_destroy(&command_mutex_);
    pthread_cond_destroy(&command_cond_);
} void ThreadPool::AddThread()
{ if(((icurr_thread_num_*ADD_FACTOR) < command_.size()) && (MAX_THREAD_NUM != icurr_thread_num_))
    {
        InitializeThreads();
        icurr_thread_num_ += THREAD_NUM;
    }
} void ThreadPool::DeleteThread()
{ int size = icurr_thread_num_ - THREAD_NUM;
    std::mapint>::iterator iter = thread_id_map_.begin(); for(int i=0; iiter)
    {
        iter->second = 1;
    }
}
复制代码

main.cpp

复制代码
#include "thread_pool.h" #include "command.h" int main()
{
    ThreadPool thread_pool;
    thread_pool.InitializeThreads();
    Command command; char arg[8] = {0}; for(int i=1; i<=1000; ++i)
    {
        command.set_cmd(i%3);
        sprintf(arg,"%d",i);
        command.set_arg(arg);
        thread_pool.AddWork(command);
    }
    sleep(10); // 用于测试线程池缩容  thread_pool.ThreadDestroy(); return 0;
}
复制代码

 

代码是按照google的开源c++编码规范编写。大家可以通过改变那几个宏的值来调整线程池。有问题大家一起讨论。

阅读(1810) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

tianyashuibin2014-08-02 09:00:22

http://www.cnblogs.com/osyun/archive/2012/08/31/2664938.html