Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1460759
  • 博文数量: 181
  • 博客积分: 3308
  • 博客等级: 中校
  • 技术积分: 2227
  • 用 户 组: 普通用户
  • 注册时间: 2010-10-03 12:03
个人简介

我是zoro

文章分类

全部博文(181)

文章存档

2015年(1)

2013年(35)

2012年(39)

2011年(50)

2010年(56)

分类: LINUX

2010-10-10 16:23:30

线程池代码

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

#define NUM 3

#ifdef D
#define DB(format,...) printf(format,#__VA_ARGS__)
#else
#define DB(format,...) do{}while(0)
#endif

typedef enum {
    BUSY,        
    IDLE
} Thread_Status;

typedef void (*task_t)(void *arg);

typedef struct {
    pthread_t tid;            //线程ID

    pthread_mutex_t mutex;        //互斥锁

    pthread_cond_t cond;        //条件变量

    Thread_Status status;        //线程状态:BUSY OR IDLE

    task_t task;            //任务

    void *arg;
    int cnt;
} Thread_Poll_t;

int threadPoll_create_and_init(int num); //创建并初始化线程池

int threadPoll_assign_work(task_t task, void *arg); //给线程池分配工作


static int g_thread_num = 0;    //线程池可用线程数量

Thread_Poll_t *g_poll = NULL;     //线程池地址


void *thread_handler(void *arg)
{
    Thread_Poll_t *thread = (Thread_Poll_t *)arg;
    while (1)
    {
        pthread_mutex_lock(&thread->mutex);
        thread->status = IDLE;
        pthread_cond_wait(&thread->cond, &thread->mutex);
        if (thread->task == NULL)
        {
            pthread_mutex_unlock(&thread->mutex);
            continue;
        }
        thread->status = BUSY;    
        if ((thread->cnt += 1) < 0)
            thread->cnt = 0;
        pthread_mutex_unlock(&thread->mutex);
        (thread->task)(thread->arg);
    }
    DB("thread_handler success!\n");
    return NULL;
}

int thread_poll_create(Thread_Poll_t *thread, int num)
{
    if (!thread || num <= 0)
        return -1;
    int i,ret,err;
    pthread_attr_t attr;
    
    err = pthread_attr_init(&attr);//初始化attr位置的pthread_attr_t结构

    if(err!=0)
    {
        DB("error in thread_poll_create: pthread_attr_init error!\n");
        return(err);
    }
    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);/*设置pthread_attr_t结构中detachstate线程属性,以分离状态启动线程*/
    if(err==0)
    {
        for (i = 0; i < num; i++)
        {
            ret = pthread_create(&(thread[i].tid), &attr, thread_handler, (void *)(&thread[i]));/*创建线程*/
            if (ret)
            {
                fprintf(stderr, "pthread_create : %s\n", strerror(ret));
                return -1;    
            }    
        }
    }
    pthread_attr_destroy(&attr);
    DB("thread_poll_create success!\n");
    return 0;
}

int threadPoll_create_and_init(int num)
{
    if (num <= 0)
        return -1;
    int i, ret;
    Thread_Poll_t *thread;

    thread = (Thread_Poll_t *)malloc(num * sizeof(Thread_Poll_t));
    if (thread == NULL)
    {
        perror("malloc");
        return -1;
    }
    g_poll = thread;    
    g_thread_num = num;    
    
    for (i = 0; i < num; i++)
    {
        pthread_mutex_init(&(thread[i].mutex), NULL);
        pthread_cond_init(&(thread[i].cond), NULL);
        thread[i].status = IDLE;
        thread[i].task = NULL;
        thread[i].arg = NULL;
        thread[i].cnt = 0;
    }
    ret = thread_poll_create(thread, num);
    if (ret == -1)
        return -1;
    DB("threadPoll_create_and_init success!\n");
    return 0;
}

void threadPoll_destroy()
{
    int i, num;    
    Thread_Poll_t *thread;

    num = g_thread_num;
    thread = g_poll;

    if ((num == 0) || (thread == NULL))
        return;
    for (i = 0; i < num; i++)
    {
        while (1)
        {
            pthread_mutex_lock(&(thread[i].mutex));
            if (thread[i].status == BUSY)
            {
                pthread_mutex_unlock(&(thread[i].mutex));
                usleep(1000);
            }
            else
            {
                pthread_mutex_unlock(&(thread[i].mutex));
                break;
            }
        }    
        pthread_cancel(thread[i].tid);
        pthread_cond_destroy(&(thread[i].cond));
        pthread_mutex_destroy(&(thread[i].mutex));
    }    
    if (!thread)
        free(thread);
    DB("threadPoll_destroy over!\n");
}

int threadPoll_assign_work(task_t task, void *arg)
{
    int i, num;
    int index = 0;
    int record = -1;    //记录,-1表示线程都处于BUSY状态

    Thread_Poll_t *thread;

    thread = g_poll;
    num = g_thread_num;

    if ((thread == NULL) || (num == 0))
    {
        fprintf(stderr, "Thread poll has not been created!\n");
        return -1;
    }
    for (i = 0; i < num; i++)
    {
        if ((thread[i].status == IDLE))
        {
            pthread_mutex_lock(&(thread[i].mutex));
            if (record == -1)
            {
                record = thread[i].cnt;
                index = i;
            }
            else
            {
                if (thread[i].cnt < record)
                {
                    record = thread[i].cnt;
                    index = i;
                }
            }
            pthread_mutex_unlock(&(thread[i].mutex));
            break;            //找到第一个空闲线程便退出    

        }                
    }    
    if (record != -1)
    {
        pthread_mutex_lock(&(thread[index].mutex));
        thread[index].task = task;
        thread[index].arg = arg;
        pthread_cond_signal(&(thread[index].cond));
        pthread_mutex_unlock(&(thread[index].mutex));
        sched_yield();    /*这个函数可以使用另一个级别等于或高于当前线程的线程先运行。如果没有符合条件的线程,那么这个函数将会立刻返回然后继续执行当前线程的程序。*/
    }
    else
    {
        return -1;
    }
    
    DB("threadPoll_assign_work over!\n");
    return 0;
}

void task(void *arg)        //自建的测试程序,运行自定义秒数

{
    int i;
    //for (i = 0; i < NUM+5; i++)

    //{

        printf(" I am %d\n", (int)arg);
        sleep(2);
    //}

}

int main(void)
{
    int i;
    int ret = threadPoll_create_and_init(NUM);
    if (ret == -1)
    {
        printf("thread create fail\n");
        exit(1);
    }
    sleep(2);    //防止因线程池未创建初始化完成之前执行下列语句而出错    

    for (i = 0; i < NUM+8; i++)
    {
        ret = threadPoll_assign_work(task, (void *)i);
        if (ret == -1)
        {
            printf("All threads are busy! i=%d\n", i);
            sleep(1);
            i--;
            continue;
        }    
    }
    threadPoll_destroy();
}


编译:$ gcc -o test mypool.c -lpthread -D D
运行:./test
运行结果:

thread_poll_create
threadPoll_create_and_init
threadPoll_assign_work
 I am 1
 I am 0
threadPoll_assign_work
threadPoll_assign_work
All threads are i=3
 I am 2
All threads are i=3
threadPoll_assign_work
 I am 3
 I am 4
threadPoll_assign_work
threadPoll_assign_work
threadPoll_assign_work
All threads are i=7
 I am 6
All threads are i=7
 I am 7
threadPoll_assign_work
 I am 8
threadPoll_assign_work
 I am 9
threadPoll_assign_work
All threads are i=10
All threads are i=10
 I am 10
threadPoll_assign_work
threadPoll_destroy



阅读(1404) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

chinaunix网友2010-10-11 17:42:29

很好的, 收藏了 推荐一个博客,提供很多免费软件编程电子书下载: http://free-ebooks.appspot.com