Chinaunix首页 | 论坛 | 博客
  • 博客访问: 98886
  • 博文数量: 16
  • 博客积分: 530
  • 博客等级: 下士
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2010-11-17 23:25
文章分类

全部博文(16)

文章存档

2014年(1)

2012年(2)

2011年(11)

2010年(2)

我的朋友

分类: C/C++

2011-01-11 11:09:14

为理解 Linux 线程池做的一个实现

threadpool.h

#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#define    MAX_THREADS        64
#define    MAX_QUEUE_SIZE    MAX_THREADS * 16

typedef void *threadpool;

typedef void (*dispatch_func)(void *);

threadpool
create_thread_pool(int num_threads);

int
dispatch_work (threadpool thread_pool, dispatch_func work_func, void *arg);

void
destroy_thread_pool (threadpool thread_pool);

#endif


threadpool.c


#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include "threadpool.h"

typedef struct __work_t    work_t;

struct __work_t
{
    void (*func) (void *arg);
    void *arg;
    struct __work_t *next;
};

typedef struct __thread_pool
{
    pthread_mutex_t        queue_lock;
    pthread_cond_t        have_work;
    pthread_cond_t        queue_empty;
    pthread_t            *threads;

    work_t    *queue_head;
    work_t    *queue_tail;

    int        queue_size;
    int        stop_accept;
    int        shutdown;

    int        threads_count;
    int        threads_idle;

} thread_pool_t;


void *
work_func (void *arg)
{
    work_t    *cur_work;
    thread_pool_t *pool = (thread_pool_t *) arg;

    while (1)
    {
        pthread_mutex_lock (&(pool->queue_lock));
        pool->threads_idle ++;
        while (!pool->queue_size && !pool->shutdown)
        {
            pthread_cond_wait (&(pool->have_work), &(pool->queue_lock));
        }

        if (pool->shutdown)
        {
            pthread_mutex_unlock (&(pool->queue_lock));
            break;
        }
    
        cur_work = pool->queue_head;
        pool->queue_size --;
        if (!pool->queue_size)
        {
            pool->queue_head = NULL;
            pool->queue_tail = NULL;

            if (pool->stop_accept)
            {
                pthread_cond_signal (&(pool->queue_empty));
            }
        }
        else
        {
            pool->queue_head = cur_work->next;
        }

        pool->threads_idle --;
        pthread_mutex_unlock (&(pool->queue_lock));

        (cur_work->func) (cur_work->arg);
        free (cur_work);
    }

    return NULL;
}

threadpool
create_thread_pool(int num_threads)
{
    thread_pool_t *pool;
    int i;

    if ((num_threads <= 0) || (num_threads > MAX_THREADS))
        return NULL;

    pool = (thread_pool_t *) malloc(sizeof(thread_pool_t));
    if (pool == NULL)
    {
        printf("Out of memory creating a new threadpool!\n");
        return NULL;
    }

    pool->threads = (pthread_t*) malloc (sizeof(pthread_t) * num_threads);

    if(!pool->threads)
    {
        printf("Out of memory creating a new threadpool!\n");
        free (pool);
        return NULL;
    }

    pool->threads_count = num_threads;
    pool->threads_idle = 0;
    pool->queue_size = 0;
    pool->queue_head = NULL;
    pool->queue_tail = NULL;
    pool->shutdown = 0;
    pool->stop_accept= 0;

    if(pthread_mutex_init(&(pool->queue_lock), NULL))
    {
        printf("thread mutex initiation fail!\n");
        free (pool->threads);
        free (pool);
        return NULL;
    }

    if(pthread_cond_init(&(pool->have_work), NULL))
    {
        printf("thread cond initiation fail!\n");
        free (pool->threads);
        free (pool);
        return NULL;
    }

    if(pthread_cond_init(&(pool->queue_empty), NULL))
    {
        printf("thread cond initiation fail!\n");
        free (pool->threads);
        free (pool);
        return NULL;
    }

    for (i=0; i<num_threads; i++)
    {
        if(pthread_create(&(pool->threads[i]), NULL, work_func, pool))
        {
            printf("Thread[%d] create fail!\n", i);
            free (pool->threads);
            free (pool);
            return NULL;
        }
    }

    return (threadpool *) pool;
}

int
dispatch_work (threadpool thread_pool, dispatch_func work_func, void *arg)
{
    thread_pool_t *pool = (thread_pool_t *)thread_pool;
    work_t *cur_work;

    if (pool->queue_size > MAX_QUEUE_SIZE)
        return 1;

    if (pool->stop_accept)
        return -1;

    cur_work = (work_t *) malloc(sizeof(work_t));
    if(cur_work == NULL)
    {
        printf("Out of memory creating a work struct!\n");
        return -2;
    }

    cur_work->func = work_func;
    cur_work->arg = arg;
    cur_work->next = NULL;

    pthread_mutex_lock(&(pool->queue_lock));

    if(pool->queue_size == 0)
    {
        pool->queue_head = cur_work;
        pool->queue_tail = cur_work;
        pthread_cond_signal(&(pool->have_work));
    }
    else
    {
        pool->queue_tail->next = cur_work;
        pool->queue_tail = cur_work;
    }
    pool->queue_size++;
    pthread_mutex_unlock(&(pool->queue_lock));

    return 0;
}

void
destroy_thread_pool (threadpool thread_pool)
{
    int i=0;
    void* nothing;    
    thread_pool_t *pool = (thread_pool_t *)thread_pool;

    pthread_mutex_lock (&(pool->queue_lock));
    pool->stop_accept = 1;

    while (pool->queue_size)
    {
        pthread_cond_wait (&(pool->queue_empty), &(pool->queue_lock));
    }

    pool->shutdown = 1;
    pthread_mutex_unlock (&(pool->queue_lock));

    for (i=0; i<pool->threads_count; i++)
    {
        pthread_join(pool->threads[i], &nothing);
    }

    pthread_cond_destroy (&(pool->have_work));        
    pthread_cond_destroy (&(pool->queue_empty));        
    pthread_mutex_destroy(&(pool->queue_lock));

    free (pool->threads);
    free (pool);
    return;
}


tpltest.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include "threadpool.h"

void
work_1(void *arg)
{
    pthread_t tid = pthread_self ();

    int seconds = (int)arg;
    printf ("work_1 sleep %d --%x\n", seconds, tid);
    sleep (seconds);
    printf ("work_1 done %d --%x\n", seconds, tid);
}

void
work_2(void *arg)
{
    pthread_t tid = pthread_self ();

    int seconds = (int)arg;
    printf ("work_2 sleep %d --%x\n", seconds, tid);
    sleep (seconds);
    printf ("work_2 done %d --%x\n", seconds, tid);
}

int
main (int argc, char **argv)
{
    threadpool tp;
    int i;

    tp = create_thread_pool (7);
    for(i=1; i<16; i++)
    {
        dispatch_work( tp, work_1, (void *) i);        
    }

    for(i=1; i<12; i++)
    {
        dispatch_work (tp, work_2, (void *) i);        
    }
 

    destroy_thread_pool (tp);
    return 0;
}


阅读(1073) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~