全部博文(73)
分类: LINUX
2009-04-23 16:08:09
pthread之生产者、消费者(信号量相关)
http://updatedb.blog.hexun.com/7082010_d.html#
生产者、消费者的问题,早就是经典问题中的经典,只要有并行、有共享的地方就有它的身影。每次用不同的技术去模拟这个情景有不同的感受...上一次模拟这个情景是在2003年了吧,给女同学做作业:),但很不愉快呀
下面的代码,将会产生N_CONS个消费者,N_PROD个生产者,1个控制线程用于通过生产者与消费者退出。
记得因为消费者与生产者退出之后,生产的产品可能没有全部被消耗掉,所以要在最后自己再释放到没有释放的物品占用的资源。
#include
#include
#include
#include
#include
#define N_CONS 5 //5
#define N_PROD 1 //1个生产者
#define MAX_REQUEST_LIST 5 //最大的库存
typedef struct list
{
int value;
struct list * next;
}request_t; //物品结构
pthread_t tid;
pthread_attr_t attr;
sem_t barrier; //信号量
pthread_mutex_t requests_lock = PTHREAD_MUTEX_INITIALIZER; //生产者与消息者、控制线程共用的锁
pthread_cond_t requests_producer = PTHREAD_COND_INITIALIZER;
pthread_cond_t requests_consumer = PTHREAD_COND_INITIALIZER;
request_t *products_list;
int length = 0;
int stop = 0;
static int x = 0;
void debug(){ //打印调试信息
request_t *last = products_list;
printf("List size:
%d\n\tContents:", length);
while(last)
{
printf("Value:%d\t",
last->value);
last = last->next;
}
printf("\n");
}
//分配一个request_t
request_t * get_request()
{
request_t *request;
request = (request_t
*)malloc(sizeof(request_t));
assert(request);
srand((int)time(0));
request->value=x++;
request->next=NULL;
printf("%lu Get value:
%d\n", pthread_self(), request->value);
return request;
}
//释放request_t
void process_request(request_t *request)
{
assert(request);
printf("%lu Trash value;
%d\n", pthread_self(), request->value);
free(request);
}
//增加request到request列表的末端
void add_request(request_t *request)
{
assert(request);
request_t * last;
request_t * pre = products_list;
if( products_list == NULL )
{
products_list = request;
debug();
return;
}
last = products_list->next;
while(last)
{
pre = last;
last = last->next;
}
pre->next = request;
debug();
}
//从队头删除一个request_t,并且返回
request_t * remove_request(void)
{
assert(products_list);
request_t * request;
debug();
request = products_list;
products_list =
products_list->next;
return request;
}
void thread_single_barrier(sem_t *barrier, int count)
{
while(count>0)
{
sem_wait(barrier);
count--;
}
}
void * producer(void *arg)
{
request_t *request;
while(1)
{
request = get_request();
pthread_mutex_lock(&requests_lock);
//如果队列未满者且停止进程未要求退出
while((length>
MAX_REQUEST_LIST)&&(!stop))
{
printf("producer
%lu condition wait\n", pthread_self() );
pthread_cond_wait(&requests_producer,&requests_lock);
}
//增加对象对队列中,供消费者使用
add_request(request);
length++;
if(stop)
break;
pthread_mutex_unlock(&requests_lock);
printf("Send signal to
consumer\n");
pthread_cond_signal(&requests_consumer);
}
pthread_mutex_unlock(&requests_lock);
sem_post(&barrier);
pthread_exit(NULL);
}
//消费者进程
void *consumer(void *arg)
{
request_t *request;
while(1)
{
pthread_mutex_lock(&requests_lock);
//如果队列中没有产品且停止线程未通知退出
while((length==0)
&& (!stop))
{
printf("consumer
%lu condition wait\n", pthread_self() );
pthread_cond_wait(&requests_consumer,
&requests_lock);
}
if(stop)
{
break;
}
//将队列中的内容删,要保证原子性,应该在unlock里面
request = remove_request();
length--;
pthread_mutex_unlock(&requests_lock);
pthread_cond_signal(&requests_producer);
process_request(request);
}
pthread_mutex_unlock(&requests_lock);
sem_post(&barrier);
pthread_exit(NULL);
}
//停止进程
void *stopper(void *arg)
{
sleep(4);
pthread_mutex_lock(&requests_lock);
stop = 1;
pthread_mutex_unlock(&requests_lock);
pthread_cond_broadcast(&requests_producer);
pthread_cond_broadcast(&requests_consumer);
pthread_exit(NULL);
}
int main()
{
int j,i,status;
for(j=0; j<1; j++)
{
//创建N_CONS个消费者
printf("Starting
consumers. List length: %d.\n", length);
for (i=0; i
printf("Create
consumer thread %d\n", i);
status =
pthread_create(&tid, &attr, consumer, NULL);
if (status<0)
printf("Create
thread %d failed!\n", i);
}
pthread_mutex_lock(&requests_lock);
while(length != 0 )
pthread_cond_wait(&requests_producer,
&requests_lock);
printf("Strating
producers.\n");
pthread_mutex_unlock(&requests_lock);
//
for(i=0; i
printf("Create
productor thread %d\n", i);
status =
pthread_create(&tid, &attr, producer, NULL);
if (status<0)
printf("Create
thread %d failed!\n", i);
}
//
printf( "Create stop
thread\n" );
pthread_create(&tid,
&attr, stopper, NULL);
thread_single_barrier(&barrier,
N_PROD+N_CONS);
stop=0;
//消费者于生产者退出后,队列中的物品个数(内存没有释放)
printf("All exited.
List length: %d.\n", length);
//释放已经分配但没有释放的内存
while ( products_list )
{
remove_request();
length--;
}
printf("All exited.
List length: %d.\n", length);
sleep(4);
}
}