Chinaunix首页 | 论坛 | 博客
  • 博客访问: 85606
  • 博文数量: 9
  • 博客积分: 1529
  • 博客等级: 上尉
  • 技术积分: 113
  • 用 户 组: 普通用户
  • 注册时间: 2010-05-16 02:14
文章分类

全部博文(9)

文章存档

2011年(5)

2010年(4)

我的朋友

分类: LINUX

2010-05-24 08:06:35

文件: threadpool.tar.gz
大小: 2KB
下载: 下载
发一个Linux下的线程池
    大二学习VC++时侯就在考虑写个线程池,由于当时的水平有限,一直没写出来,最近闲着没事就想到了线程池,就下定决心写一个简单的线程池实现以前的夙愿。当然如今我是Linux fans,早就将Microsoft那些东西抛弃了,于是就在Linux下写了。核心思想是通过一个条件变量来触发线程的执行,当给线程设置了任务后就触发条件变量,线程就会从阻塞于条件变量的状态被唤醒,并开始执行新任务。
    
 1. 定义一个任务数据结构,当然不是Linux内核的任务数据结构哦,呵呵。
/**
 * 定义一个表示任务的数据结构
 * task是一个函数指针,函数原型为 int function(void* data)
 * data是传递给task函数的参数
 */
typedef struct _TASK
{
     int (*task)(void*);
     void* data;
}TASK;

2. 定义一个表示线程的类Thread:
#include
/**
 * 自定义线程
 *
 *
 */
class Thread
{
    public:
        Thread();
~Thread();

int  init();    // 初始化函数,成功返回0,失败返回非0
void setTask(TASK* task); // 设置任务
void start(); // 开始处理任务
int isIdel(); // 线程是否空闲,空闲放回1,忙返回0
void destroy(); // 销毁函数,做一些清理工作

    private:
int idel; // 线程空闲标记,空闲1,忙0
pthread_rwlock_t   idel_rwlock; // Read Write Lock,保护变量idel
pthread_t    thread; // 指向pthread库线程
pthread_mutex_t mutex; // 互斥量,保护条件变量condition
pthread_cond_t condition; // 条件变量,控制pthread库线程
pthread_mutex_t task_mutex; // 互斥量,保护变量task
TASK* task; // 任务
static void* proccess(void* ptr); // pthread库线程函数
};

3. 下面是类Thread的实现
#include
#include
#include "thread.h"

Thread::Thread()
{
}

Thread::~Thread()
{
destroy();
}

int Thread::init()
{
    int success = 0;
    idel = 1;

    success = pthread_mutex_init(&(this->task_mutex), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create mutex failed."
                 <
//destroy(); // No need
return success;
    }
    success = pthread_mutex_init(&(this->mutex), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create mutex failed."
                 <
//destroy();
return success;
    }

    success = pthread_cond_init(&(this->condition), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create condition failed."
                 <
//destroy();
return success;
    }

    success = pthread_rwlock_init(&(this->idel_rwlock), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create rwlock failed."
                 <
//destroy();
return success;
    }

    success = pthread_create(&(this->thread), NULL, proccess, this);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create thread failed."
 <
//destroy();
return success;
    }
    std::cout<<"[INFO] thread init ok"
     <
    return success;
}

void Thread:: destroy()
{
    pthread_rwlock_destroy(&(this->idel_rwlock));
    pthread_mutex_destroy(&(this->mutex));
    pthread_mutex_destroy(&(this->task_mutex));
    pthread_cond_destroy(&(this->condition));
}

void Thread::setTask(TASK* task)
{
    assert(task != NULL);
    pthread_mutex_lock(&(this->task_mutex));
    this->task = task;
}

void Thread::start()
{
    pthread_rwlock_wrlock(&(this->idel_rwlock));
    this->idel = 0;
    pthread_rwlock_unlock(&(this->idel_rwlock));

    pthread_mutex_lock(&(this->mutex));
    pthread_cond_signal(&(this->condition));
    pthread_mutex_unlock(&(this->mutex));
}

int Thread::isIdel()
{
    int idel = 0;
    pthread_rwlock_rdlock(&(this->idel_rwlock));
    idel = this->idel;
    pthread_rwlock_unlock(&(this->idel_rwlock));
    return idel;
}

void* Thread::proccess(void* ptr)
{
    Thread *p = (Thread*)ptr;

    for(;;)
    {
pthread_cond_wait(&(p->condition),&(p->mutex));
std::cout<<"[INFO] in thread"
                 <
        // 执行任务
p->task->task(p->task->data);
p->task = NULL;
pthread_mutex_unlock(&(p->task_mutex)); // must here unlock
pthread_rwlock_wrlock(&(p->idel_rwlock));
        p->idel = 1;
        pthread_rwlock_unlock(&(p->idel_rwlock));

    }

    return 0;
}

4. 下面定义一个表示线程池的类ThreadPool:
#include
#include "thread.h"

class ThreadPool
{
    int size; // 线程池大小
    Thread* threads; // 线程数组
    pthread_mutex_t mutex; // 互斥量,保护threads

    public:
        ThreadPool();
~ThreadPool();

int init(int size); // 初始化函数,按给定的size初始化线程池
void destroy(); // 销毁函数,主要是销毁线程中的所有线程
Thread* getIdelThread(); // 获得线程池中第一个空闲线程,没用空闲线程返回NULL
};

5. 下面是ThreadPool的实现:
#include
#include "threadpool.h"

ThreadPool::ThreadPool()
{
    size    = 0;
    threads = NULL;
}

ThreadPool::~ThreadPool()
{
    destroy();
}

int ThreadPool::init(int size)
{
    assert(size > 0);
    this->size = size;
    this->threads = new Thread[size];
    // 初始化互斥量
    int success = pthread_mutex_init(&(this->mutex), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create mutex failed."
                 <
return success;
    }

    for(int i = 0; i < size; i++)
    {
if(threads[i].init())
{
    destroy();
    return -1;
}
    }
    return 0;
}

void ThreadPool::destroy()
{
   pthread_mutex_destroy(&(this->mutex));
    delete [] threads;
}

Thread* ThreadPool::getIdelThread()
{
    Thread* thread = NULL;
    pthread_mutex_lock(&(this->mutex));
    for(int i = 0; i < size; i++)
    {
        if(threads[i].isIdel())
{
    thread = &(threads[i]);
    break;
}
    }
    pthread_mutex_unlock(&(this->mutex));
    return thread;
}

6. 下面写一个测试程序test.cc,为了是其简单,使用了禁止使用的GOTO语句
#include

#include "thread.h"
#include "threadpool.h"

/**
 * 各任务的处理函数
 *
 */
int function1(void* data);
int function2(void* data);
int function3(void* data);
int function4(void* data);
int function5(void* data);
int function6(void* data);
int function7(void* data);
int function8(void* data);

int main(int argc, char** argv)
{
    ThreadPool threadPool;
    if(threadPool.init(4))
    {
std::cout<<"[ERROR] threadPool.init failed"
 << std::endl;
return -1;
    }
    // 循环次数
    int n = 1000000000;

    // Run task 1
    TASK task1;
    task1.task = &function1;
    task1.data = (void*)(&n);
L1: Thread* pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
pThread->setTask(&task1);
pThread->start();
    }
    else
    {
//std::cout<<"[INFO] there is no idel thread in the pool"
// << std::endl;
// 再次尝试获得空闲的Thread,这里是不可能的,这时肯定有四个空闲的Thread
goto L1;
    }

    // Run task 2
    TASK task2;
    task2.task = &function2;
    task2.data = (void*)(&n);
L2: pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
        pThread->setTask(&task2);
        pThread->start();
    }
    else
    {
        //std::cout<<"[INFO] there is no idel thread in the pool"
        //         << std::endl;
        // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有三个空闲的Thread
goto L2;
    }

    // Run task 3
    TASK task3;
    task3.task = &function3;
    task3.data = (void*)(&n);
L3: pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
        pThread->setTask(&task3);
        pThread->start();
    }
    else
    {
        //std::cout<<"[INFO] there is no idel thread in the pool"
        //         << std::endl;
// 再次尝试获得空闲的Thread,这里是不可能的,这时至少有两个空闲的Thread
goto L3;
    }
    
    // Run task 4
    TASK task4;
    task4.task = &function4;
    task4.data = (void*)(&n);
L4: pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
        pThread->setTask(&task4);
        pThread->start();
    }
    else
    {
        //std::cout<<"[INFO] there is no idel thread in the pool"
        //         << std::endl;
        // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有一个空闲的Thread
goto L4;
    }

    // Run task 5
    TASK task5;
    task5.task = &function5;
    task5.data = (void*)(&n);
L5: pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
        pThread->setTask(&task5);
        pThread->start();
    }
    else
    {
        //std::cout<<"[INFO] there is no idel thread in the pool"
        //         << std::endl;
        // 再次尝试获得空闲的Thread,这里是可能的
goto L5;
    }

    // Run task 6
    TASK task6;
    task6.task = &function6;
    task6.data = (void*)(&n);
L6: pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
        pThread->setTask(&task6);
        pThread->start();
    }
    else
    {
        //std::cout<<"[INFO] there is no idel thread in the pool"
        //         << std::endl;
        // 再次尝试获得空闲的Thread,这里是可能的
goto L6;
    }

    // Run task 7
    TASK task7;
    task7.task = &function7;
    task7.data = (void*)(&n);
L7: pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
        pThread->setTask(&task7);
        pThread->start();
    }
    else
    {
        //std::cout<<"[INFO] there is no idel thread in the pool"
        //         << std::endl;
// 再次尝试获得空闲的Thread,这里是可能的
goto L7;
    }
    
    // Run task 8
    TASK task8;
    task8.task = &function8;
    task8.data = (void*)(&n);
L8: pThread = threadPool.getIdelThread();
    if(pThread != NULL)
    {
        pThread->setTask(&task8);
        pThread->start();
    }
    else
    {
        //std::cout<<"[INFO] there is no idel thread in the pool"
        //         << std::endl;
        // 再次尝试获得空闲的Thread,这里是可能的
goto L8;
    }

   // 等待所有任务完成
   while(1)
   {
   }

   return 0;
}

/**
 * 任务1的处理函数,只是空循环浪费CPU时间
 *
 */
int function1(void* data)
{
int n = *((int*)(data));
std::cout<<"[INFO] task1 running"
<
for(int i = 0; i < n; i++)
{
}
std::cout<<"[INFO] task1 end"
                 <
return 0;
}

/**
 * 任务2的处理函数,只是空循环浪费CPU时间
 *
 */
int function2(void* data)
{
        int n = *((int*)(data));
        std::cout<<"[INFO] task2 running"
                 <
        for(int i = 0; i < n; i++)
        {
        }
        std::cout<<"[INFO] task2 end"
                 <
return 0;
}

/**
 * 任务3的处理函数,只是空循环浪费CPU时间
 *
 */
int function3(void* data)
{
        int n = *((int*)(data));
        std::cout<<"[INFO] task3 running"
                 <
        for(int i = 0; i < n; i++)
        {
        }
        std::cout<<"[INFO] task3 end"
                 <
return 0;
}

/**
 * 任务4的处理函数,只是空循环浪费CPU时间
 *
 */
int function4(void* data)
{

        int n = *((int*)(data));
        std::cout<<"[INFO] task4 running"
                 <
        for(int i = 0; i < n; i++)
        {
        }
        std::cout<<"[INFO] task4 end"
                 <
return 0;
}

/**
 * 任务5的处理函数,只是空循环浪费CPU时间
 *
 */
int function5(void* data)
{
        int n = *((int*)(data));
        std::cout<<"[INFO] task5 running"
                 <
        for(int i = 0; i < n; i++)
        {
        }
        std::cout<<"[INFO] task5 end"
                 <
return 0;
}

/**
 * 任务6的处理函数,只是空循环浪费CPU时间
 *
 */
int function6(void* data)
{

        int n = *((int*)(data));
        std::cout<<"[INFO] task6 running"
                 <
        for(int i = 0; i < n; i++)
        {
        }
        std::cout<<"[INFO] task6 end"
                 <
return 0;
}

/**
 * 任务7的处理函数,只是空循环浪费CPU时间
 *
 */
int function7(void* data)
{
        int n = *((int*)(data));
        std::cout<<"[INFO] task7 running"
                 <
        for(int i = 0; i < n; i++)
        {
        }
        std::cout<<"[INFO] task7 end"
                 <
return 0;
}

/**
 * 任务8的处理函数,只是空循环浪费CPU时间
 *
 */
int function8(void* data)
{

        int n = *((int*)(data));
        std::cout<<"[INFO] task8 running"
                 <
        for(int i = 0; i < n; i++)
        {
        }
        std::cout<<"[INFO] task8 end"
                 <
return 0;
}


7. 呵呵,就是这么简单,估计肯定有BUG,希望你能在发现了BUG邮件我,我的邮件iacrqq@gmail.com,欢迎大家指出不足之处,也欢迎大家和我交流。

这个程序代码文件和Makefile,在我上传的文件,有意请下载。
阅读(1214) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

chinaunix网友2010-08-21 16:25:12

暴强! 可惜我不懂c++,不然肯定download下来研究了。