2013年(8)
分类: C/C++
2013-02-20 22:32:25
以下代码,未注释,未编译,未验证
#ifndef __TPOOL_H__
#define __TPOOL_H__
#define MAX_QUEUE_DEPTH 128
typedef void *(*thread_func)(void *);
typedef struct work_args_t
{
thread_func func;
void *agrs;
} work_args_t;
typedef struct s_work_index_t
{
long long work_index_start;
long long work_index_end;
} work_index_t;
typedef struct thread_arg_t{
sem_t sem_run;
int *num_workers_done;
sem_t *sem_all_workers_done;
int *die;
int *num_workers;
union
{
work_index_t work_index_se;
long long work_index;
}u_work_index;
work_args_t queue_arg[MAX_QUEUE_DEPTH];
} thread_arg_t;
typedef struct tpool_t {
int num_threads;
int num_threads_done;
int die;
sem_t sem_all_workers_done;
pthread_t *threads;
thread_arg_t *thread_args;
} tpool_t;
#endif /* __TPOOL_H__ */
static inline unsigned long long atomic_read(void* addr) { return *((unsigned long long*)addr); }
static inline void flush(void* addr) {asm("":::"memory");}
static inline unsigned long long fetch_and_inc(unsigned long long* n)
{
unsigned long long oldval;
__asm__ __volatile__(
"movl $1, %0 \n"
"lock xaddl %0, (%1) \n"
: "=a" (oldval) : "b" (n));
return oldval;
}
int tpool_destroy (tpool_t *tpool)
{
int i;
int result;
result = 0;
tpool->num_threads_done = 0;
for (i = 0; i < tpool->num_threads; ++i) {
tpool->die = 1;
sem_post(&tpool->thread_args[i].sem_run);
}
sem_wait(&tpool->sem_all_workers_done);
sem_destroy(&tpool->sem_all_workers_done);
free(tpool->threads);
free(tpool->thread_args);
free(tpool);
return result;
}
int dispath_queue(tpool_t *tpool, int type)
{
int i = 0;
int min = MAX_QUEUE_DEPTH;
int index = 0;
work_index_t work_index;
type = type;
for (i = 0; i < tpool->num_threads; i++)
{
work_index = atomic_read(&tpool->thread_args[i].u_work_index.work_index_se);
index = work_index.work_index_end - work_index.work_index_start;
if (min < index)
{
min = index;
}
if (min == 0)
{
return min;
}
}
return 0;
}
int add_tpool(tpool_t *tpool, thread_func thread_func, void *args)
{
int i = 0;
int t_index = 0;
int a_index = 0;
int a_index = 0;
work_index_t work_index;
if (tpool->die)
{
return -1;
}
t_index = dispath_queue(tpool, 0);
a_index = fetch_and_inc(&tpool->thread_args[t_index].u_work_index.work_index);
work_index = *(work_index_t *)&a_index;
while((work_index.work_index_end - work_index.work_index_start)> (MAX_QUEUE_DEPTH - 1))
{
usleep(1);
a_index = atomic_read(&tpool->thread_args[t_index].u_work_index.work_index);
work_index = *(work_index_t *)&a_index;
}
tpool->thread_args[t_index].queue_arg[a_index&(MAX_QUEUE_DEPTH-1)].agrs = args;
tpool->thread_args[t_index].queue_arg[a_index&(MAX_QUEUE_DEPTH-1)].func = thread_func;
sem_post(&tpool->thread_args[t_index].sem_run);
return 0;
}
static void* thread_loop (void *arg)
{
thread_arg_t *thread_arg = arg;
int num_workers_done;
work_args_t *work_args;
int index = 0;
while (1)
{
sem_wait (&thread_arg->sem_run);
if (*thread_arg->die)
{
break;
}
index = fetch_and_inc(&thread_arg->u_work_index.work_index_se.work_index_start);
if (index < 0)
{
///TODO: error
}
work_args = &thread_arg->queue_arg[index&(MAX_QUEUE_DEPTH - 1)];
work_args->func(work_args->agrs);
}
sem_destroy (&thread_arg->sem_run);
num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1;
if (num_workers_done == *thread_arg->num_workers)
{
sem_post (thread_arg->sem_all_workers_done);
}
return NULL;
}
tpool_t* tpool_create (int num_threads)
{
int i, ret;
tpool_t *tpool;
pthread_attr_t attr;
tpool = calloc(1, sizeof (tpool_t));
if (tpool == NULL)
{
return NULL;
}
tpool->num_threads = num_threads;
tpool->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads);
if (tpool->threads == NULL)
{
goto fail_threads;
}
tpool->thread_args = (thread_arg_t *)malloc(sizeof(thread_arg_t) * num_threads);
if (tpool->thread_args == NULL)
{
goto fail_thread_args;
}
ret = sem_init(&tpool->sem_all_workers_done, 0, 0);
if (ret != 0)
{
goto fail_all_workers_done;
}
pthread_attr_init (&attr);
pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
tpool->die = 0;
for (i = 0; i < num_threads; ++i) {
/* Initialize thread argument. */
sem_init (&(tpool->thread_args[i].sem_run), 0, 0);
tpool->thread_args[i].die = &tpool->die;
tpool->thread_args[i].num_workers = &tpool->num_threads;
tpool->thread_args[i].u_work_index.work_index = 0;
tpool->thread_args[i].sem_all_workers_done = &tpool->sem_all_workers_done;
tpool->thread_args[i].num_workers_done = &tpool->num_threads_done;
ret = pthread_create (&tpool->threads[i], &attr, thread_loop, &tpool->thread_args[i]);
if (ret)
goto fail_thread_create;
}
return tpool;
fail_thread_create:
--i;
while (i >= 0)
{
pthread_cancel (tpool->threads[i]);
--i;
}
fail_all_workers_done:
free (tpool->thread_args);
fail_thread_args:
free (tpool->threads);
fail_threads:
fail_args:
return NULL;
}
void* func(void *a)
{
printf(" a = %d\n", *(int *)a);
free(a);
return NULL;
}
int main(int argv, char *argc[])
{
tpool_t* pool = tpool_create (atoi(argc[1]));
int i = 0;
int *a = NULL;
for (i = 0; i < 20000; i++)
{
a = malloc(sizeof(int));
*a = i;
add_tpool(pool, func, (void*)a);
}
return;
}