Chinaunix首页 | 论坛 | 博客
  • 博客访问: 33845
  • 博文数量: 8
  • 博客积分: 25
  • 博客等级: 民兵
  • 技术积分: 131
  • 用 户 组: 普通用户
  • 注册时间: 2010-06-27 20:12
文章分类

全部博文(8)

文章存档

2013年(8)

我的朋友

分类: C/C++

2013-02-20 22:32:25

以下代码,未注释,未编译,未验证

#ifndef __TPOOL_H__
#define __TPOOL_H__

#define MAX_QUEUE_DEPTH 128
typedef void *(*thread_func)(void *);


typedef struct work_args_t
{
thread_func func;
void *agrs;
} work_args_t;
typedef struct s_work_index_t
{
    long long               work_index_start;
    long long               work_index_end;
} work_index_t;
      


typedef struct thread_arg_t{
    sem_t           sem_run;
int             *num_workers_done;
sem_t           *sem_all_workers_done;
    int             *die;
    int             *num_workers;
    union
    {  
        work_index_t work_index_se;
        long long work_index;
    }u_work_index;
    
work_args_t queue_arg[MAX_QUEUE_DEPTH];
} thread_arg_t;


typedef struct tpool_t {
    int             num_threads;
int             num_threads_done;
    int             die;
    sem_t           sem_all_workers_done;
    pthread_t       *threads;
    thread_arg_t    *thread_args;
} tpool_t;


#endif /* __TPOOL_H__ */


static inline unsigned long long atomic_read(void* addr) { return *((unsigned long long*)addr); }
    static inline void flush(void* addr) {asm("":::"memory");}


static inline unsigned long long fetch_and_inc(unsigned long long* n)
{
unsigned long long oldval;


__asm__ __volatile__(
"movl $1, %0 \n"
"lock xaddl %0, (%1) \n"
: "=a" (oldval) : "b" (n));


return oldval;
}


int tpool_destroy (tpool_t *tpool)
{
    int             i;
    int             result;
    
    result = 0;
    tpool->num_threads_done = 0;
    
    for (i = 0; i < tpool->num_threads; ++i) {
        tpool->die = 1;
        sem_post(&tpool->thread_args[i].sem_run);
    }


    sem_wait(&tpool->sem_all_workers_done);


    sem_destroy(&tpool->sem_all_workers_done);
    free(tpool->threads);
    free(tpool->thread_args);


    free(tpool);


    return result;
}


int dispath_queue(tpool_t *tpool, int type)
{
int i = 0;
int min = MAX_QUEUE_DEPTH;
int index = 0;
    work_index_t work_index;
type = type;


for (i = 0; i < tpool->num_threads; i++)
{
work_index = atomic_read(&tpool->thread_args[i].u_work_index.work_index_se);
        index = work_index.work_index_end - work_index.work_index_start;


if (min < index)
{
min = index;
}


if (min == 0)
{
return min;
}
}


return 0;
}


int add_tpool(tpool_t *tpool, thread_func thread_func, void *args)
{
    int i = 0;
int t_index = 0;
int a_index = 0;
    int a_index = 0;
    work_index_t work_index;
if (tpool->die)
{
return -1;
}


t_index = dispath_queue(tpool, 0);


a_index = fetch_and_inc(&tpool->thread_args[t_index].u_work_index.work_index);
    work_index = *(work_index_t *)&a_index;


    while((work_index.work_index_end - work_index.work_index_start)> (MAX_QUEUE_DEPTH - 1))
    {
usleep(1);
a_index = atomic_read(&tpool->thread_args[t_index].u_work_index.work_index);
        work_index = *(work_index_t *)&a_index;
    }


    tpool->thread_args[t_index].queue_arg[a_index&(MAX_QUEUE_DEPTH-1)].agrs = args;
    tpool->thread_args[t_index].queue_arg[a_index&(MAX_QUEUE_DEPTH-1)].func = thread_func;


    sem_post(&tpool->thread_args[t_index].sem_run);


    return 0;
}


static void* thread_loop (void *arg)
{
    thread_arg_t    *thread_arg = arg;
    int             num_workers_done;
work_args_t  *work_args;
int index = 0;


    while (1)
    {
        sem_wait (&thread_arg->sem_run);


if (*thread_arg->die)
        {
        break;
        }


index = fetch_and_inc(&thread_arg->u_work_index.work_index_se.work_index_start);
if (index < 0)
{
///TODO: error
}


work_args = &thread_arg->queue_arg[index&(MAX_QUEUE_DEPTH - 1)];


work_args->func(work_args->agrs);      
    }


    sem_destroy (&thread_arg->sem_run);
    num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1;
    if (num_workers_done == *thread_arg->num_workers)
    {
        sem_post (thread_arg->sem_all_workers_done);
    }


    return NULL;
}






tpool_t* tpool_create (int num_threads)
{
    int             i, ret;
    tpool_t         *tpool;
    pthread_attr_t  attr;


    tpool = calloc(1, sizeof (tpool_t));
    if (tpool == NULL) 
    {
    return NULL;
    }

    tpool->num_threads = num_threads;
    tpool->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads);
    if (tpool->threads == NULL) 
    {
    goto fail_threads;
    }


tpool->thread_args = (thread_arg_t *)malloc(sizeof(thread_arg_t) * num_threads);
    if (tpool->thread_args == NULL) 
    {
    goto fail_thread_args;
    }


ret = sem_init(&tpool->sem_all_workers_done, 0, 0);
    if (ret != 0) 
    {
    goto fail_all_workers_done;
    }


pthread_attr_init (&attr);
    pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM);
    pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);


    tpool->die = 0;
    for (i = 0; i < num_threads; ++i) {
        /* Initialize thread argument. */
        sem_init (&(tpool->thread_args[i].sem_run), 0, 0);
        tpool->thread_args[i].die = &tpool->die;
        tpool->thread_args[i].num_workers = &tpool->num_threads;
tpool->thread_args[i].u_work_index.work_index = 0;
tpool->thread_args[i].sem_all_workers_done = &tpool->sem_all_workers_done;
tpool->thread_args[i].num_workers_done = &tpool->num_threads_done;
ret = pthread_create (&tpool->threads[i], &attr, thread_loop, &tpool->thread_args[i]);
        if (ret) 
            goto fail_thread_create;
    }


    return tpool;


fail_thread_create:
    --i;
    while (i >= 0)
    {
        pthread_cancel (tpool->threads[i]);
        --i;
    }
fail_all_workers_done:
    free (tpool->thread_args);
fail_thread_args:
    free (tpool->threads);
fail_threads:
fail_args:


    return NULL;
}


void* func(void *a)
{
    printf(" a = %d\n", *(int *)a);


    free(a);


    return NULL;
}




int main(int argv, char *argc[])
{
    tpool_t*  pool = tpool_create (atoi(argc[1]));


    int i = 0;
    int *a = NULL;


    for (i = 0; i < 20000; i++)
    {


        a = malloc(sizeof(int));
        *a = i;
        add_tpool(pool, func, (void*)a);
        


    }
return;
}





阅读(1971) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~