#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#define NUM 3
#ifdef D
#define DB(format,...) printf(format,#__VA_ARGS__)
#else
#define DB(format,...) do{}while(0)
#endif
typedef enum {
BUSY,
IDLE
} Thread_Status;
typedef void (*task_t)(void *arg);
typedef struct {
pthread_t tid; //线程ID
pthread_mutex_t mutex; //互斥锁
pthread_cond_t cond; //条件变量
Thread_Status status; //线程状态:BUSY OR IDLE
task_t task; //任务
void *arg;
int cnt;
} Thread_Poll_t;
int threadPoll_create_and_init(int num); //创建并初始化线程池
int threadPoll_assign_work(task_t task, void *arg); //给线程池分配工作
static int g_thread_num = 0; //线程池可用线程数量
Thread_Poll_t *g_poll = NULL; //线程池地址
void *thread_handler(void *arg)
{
Thread_Poll_t *thread = (Thread_Poll_t *)arg;
while (1)
{
pthread_mutex_lock(&thread->mutex);
thread->status = IDLE;
pthread_cond_wait(&thread->cond, &thread->mutex);
if (thread->task == NULL)
{
pthread_mutex_unlock(&thread->mutex);
continue;
}
thread->status = BUSY;
if ((thread->cnt += 1) < 0)
thread->cnt = 0;
pthread_mutex_unlock(&thread->mutex);
(thread->task)(thread->arg);
}
DB("thread_handler success!\n");
return NULL;
}
int thread_poll_create(Thread_Poll_t *thread, int num)
{
if (!thread || num <= 0)
return -1;
int i,ret,err;
pthread_attr_t attr;
err = pthread_attr_init(&attr);//初始化attr位置的pthread_attr_t结构
if(err!=0)
{
DB("error in thread_poll_create: pthread_attr_init error!\n");
return(err);
}
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);/*设置pthread_attr_t结构中detachstate线程属性,以分离状态启动线程*/
if(err==0)
{
for (i = 0; i < num; i++)
{
ret = pthread_create(&(thread[i].tid), &attr, thread_handler, (void *)(&thread[i]));/*创建线程*/
if (ret)
{
fprintf(stderr, "pthread_create : %s\n", strerror(ret));
return -1;
}
}
}
pthread_attr_destroy(&attr);
DB("thread_poll_create success!\n");
return 0;
}
int threadPoll_create_and_init(int num)
{
if (num <= 0)
return -1;
int i, ret;
Thread_Poll_t *thread;
thread = (Thread_Poll_t *)malloc(num * sizeof(Thread_Poll_t));
if (thread == NULL)
{
perror("malloc");
return -1;
}
g_poll = thread;
g_thread_num = num;
for (i = 0; i < num; i++)
{
pthread_mutex_init(&(thread[i].mutex), NULL);
pthread_cond_init(&(thread[i].cond), NULL);
thread[i].status = IDLE;
thread[i].task = NULL;
thread[i].arg = NULL;
thread[i].cnt = 0;
}
ret = thread_poll_create(thread, num);
if (ret == -1)
return -1;
DB("threadPoll_create_and_init success!\n");
return 0;
}
void threadPoll_destroy()
{
int i, num;
Thread_Poll_t *thread;
num = g_thread_num;
thread = g_poll;
if ((num == 0) || (thread == NULL))
return;
for (i = 0; i < num; i++)
{
while (1)
{
pthread_mutex_lock(&(thread[i].mutex));
if (thread[i].status == BUSY)
{
pthread_mutex_unlock(&(thread[i].mutex));
usleep(1000);
}
else
{
pthread_mutex_unlock(&(thread[i].mutex));
break;
}
}
pthread_cancel(thread[i].tid);
pthread_cond_destroy(&(thread[i].cond));
pthread_mutex_destroy(&(thread[i].mutex));
}
if (!thread)
free(thread);
DB("threadPoll_destroy over!\n");
}
int threadPoll_assign_work(task_t task, void *arg)
{
int i, num;
int index = 0;
int record = -1; //记录,-1表示线程都处于BUSY状态
Thread_Poll_t *thread;
thread = g_poll;
num = g_thread_num;
if ((thread == NULL) || (num == 0))
{
fprintf(stderr, "Thread poll has not been created!\n");
return -1;
}
for (i = 0; i < num; i++)
{
if ((thread[i].status == IDLE))
{
pthread_mutex_lock(&(thread[i].mutex));
if (record == -1)
{
record = thread[i].cnt;
index = i;
}
else
{
if (thread[i].cnt < record)
{
record = thread[i].cnt;
index = i;
}
}
pthread_mutex_unlock(&(thread[i].mutex));
break; //找到第一个空闲线程便退出
}
}
if (record != -1)
{
pthread_mutex_lock(&(thread[index].mutex));
thread[index].task = task;
thread[index].arg = arg;
pthread_cond_signal(&(thread[index].cond));
pthread_mutex_unlock(&(thread[index].mutex));
sched_yield(); /*这个函数可以使用另一个级别等于或高于当前线程的线程先运行。如果没有符合条件的线程,那么这个函数将会立刻返回然后继续执行当前线程的程序。*/
}
else
{
return -1;
}
DB("threadPoll_assign_work over!\n");
return 0;
}
void task(void *arg) //自建的测试程序,运行自定义秒数
{
int i;
//for (i = 0; i < NUM+5; i++)
//{
printf(" I am %d\n", (int)arg);
sleep(2);
//}
}
int main(void)
{
int i;
int ret = threadPoll_create_and_init(NUM);
if (ret == -1)
{
printf("thread create fail\n");
exit(1);
}
sleep(2); //防止因线程池未创建初始化完成之前执行下列语句而出错
for (i = 0; i < NUM+8; i++)
{
ret = threadPoll_assign_work(task, (void *)i);
if (ret == -1)
{
printf("All threads are busy! i=%d\n", i);
sleep(1);
i--;
continue;
}
}
threadPoll_destroy();
}
|