Chinaunix首页 | 论坛 | 博客
  • 博客访问: 192457
  • 博文数量: 73
  • 博客积分: 5000
  • 博客等级: 大校
  • 技术积分: 1160
  • 用 户 组: 普通用户
  • 注册时间: 2009-04-23 15:53
文章分类

全部博文(73)

文章存档

2011年(1)

2009年(72)

我的朋友

分类: LINUX

2009-04-23 16:08:09

pthread之生产者、消费者(信号量相关)


http://updatedb.blog.hexun.com/7082010_d.html#

 

生产者、消费者的问题,早就是经典问题中的经典,只要有并行、有共享的地方就有它的身影。每次用不同的技术去模拟这个情景有不同的感受...上一次模拟这个情景是在2003年了吧,给女同学做作业:),但很不愉快呀

下面的代码,将会产生N_CONS个消费者,N_PROD个生产者,1个控制线程用于通过生产者与消费者退出。
记得因为消费者与生产者退出之后,生产的产品可能没有全部被消耗掉,所以要在最后自己再释放到没有释放的物品占用的资源。

#include
#include
#include

#include
#include

#define N_CONS 5                      //5
个消费者
#define N_PROD 1                      //1
个生产者
#define MAX_REQUEST_LIST 5     //
最大的库存

typedef struct list
{
        int value;
        struct list * next;   
}request_t;                           //
物品结构

pthread_t tid;
pthread_attr_t attr;
sem_t barrier;                                //
信号量
pthread_mutex_t requests_lock = PTHREAD_MUTEX_INITIALIZER;   //
生产者与消息者、控制线程共用的锁
pthread_cond_t requests_producer = PTHREAD_COND_INITIALIZER;
pthread_cond_t requests_consumer = PTHREAD_COND_INITIALIZER;
request_t *products_list;

int length = 0;
int stop = 0;
static int x = 0;

void debug(){  //
打印调试信息
        request_t *last = products_list;
        printf("List size: %d\n\tContents:", length);
        while(last)
        {
               printf("Value:%d\t", last->value);   
               last = last->next;
        }
        printf("\n");
}

//
分配一个request_t
request_t * get_request()
{
        request_t *request;

        request = (request_t *)malloc(sizeof(request_t));
        assert(request);
       
        srand((int)time(0));
        request->value=x++;   
        request->next=NULL;
        printf("%lu Get value: %d\n", pthread_self(), request->value);
       
        return request;
}

//
释放request_t
void process_request(request_t *request)
{
        assert(request);
        printf("%lu Trash value; %d\n", pthread_self(), request->value);
        free(request);
}

//
增加requestrequest列表的末端
void add_request(request_t *request)
{
        assert(request);
       
        request_t * last;
        request_t * pre = products_list;
        if( products_list == NULL )
        {
               products_list = request;
               debug();
               return;
        }
       
        last = products_list->next;
       
        while(last)
        {
               pre = last;
               last = last->next;
        }

        pre->next = request;

        debug();
}

//
从队头删除一个request_t,并且返回
request_t * remove_request(void)
{
        assert(products_list);

        request_t * request;
        debug();
        request = products_list;
        products_list = products_list->next;
       
        return request;
}

void thread_single_barrier(sem_t *barrier, int count)
{
        while(count>0)
        {
               sem_wait(barrier);
               count--;
        }
}

void * producer(void *arg)
{
        request_t *request;

        while(1)
        {
               request = get_request();
               pthread_mutex_lock(&requests_lock);

               //
如果队列未满者且停止进程未要求退出
               while((length> MAX_REQUEST_LIST)&&(!stop))
                {
                       printf("producer %lu condition wait\n", pthread_self() );
                       pthread_cond_wait(&requests_producer,&requests_lock);
               }

               //
增加对象对队列中,供消费者使用
               add_request(request);
               length++;
               if(stop)
                       break;
               pthread_mutex_unlock(&requests_lock);
               printf("Send signal to consumer\n");
               pthread_cond_signal(&requests_consumer);     
        }
        pthread_mutex_unlock(&requests_lock);
        sem_post(&barrier);
        pthread_exit(NULL);
}

//
消费者进程
void *consumer(void *arg)
{
        request_t *request;
        while(1)
        {
               pthread_mutex_lock(&requests_lock);

               //
如果队列中没有产品且停止线程未通知退出
               while((length==0) && (!stop))
               {      
                       printf("consumer %lu condition wait\n", pthread_self() );
                       pthread_cond_wait(&requests_consumer, &requests_lock);
               }
               if(stop)
               {
                       break;                        
               }

               //
将队列中的内容删,要保证原子性,应该在unlock里面
               request = remove_request();
               length--;
               pthread_mutex_unlock(&requests_lock);
               pthread_cond_signal(&requests_producer);
               process_request(request);
        }
        pthread_mutex_unlock(&requests_lock);
        sem_post(&barrier);
        pthread_exit(NULL);
}

//
停止进程
void *stopper(void *arg)
{
        sleep(4);
        pthread_mutex_lock(&requests_lock);
        stop = 1;
        pthread_mutex_unlock(&requests_lock);
        pthread_cond_broadcast(&requests_producer);
        pthread_cond_broadcast(&requests_consumer);
        pthread_exit(NULL);
}

int main()
{      
        int j,i,status;
        for(j=0; j<1; j++)
        {
               //
创建N_CONS个消费者
               printf("Starting consumers. List length: %d.\n", length);
               for (i=0; i                {
                       printf("Create consumer thread %d\n", i);
                       status = pthread_create(&tid, &attr, consumer, NULL);
                       if (status<0)
                               printf("Create thread %d failed!\n", i);
               }

               pthread_mutex_lock(&requests_lock);
               while(length != 0 )
                       pthread_cond_wait(&requests_producer, &requests_lock);
              
               printf("Strating producers.\n");
               pthread_mutex_unlock(&requests_lock);

               //
创建N_PROD个生产者
               for(i=0; i                {
                       printf("Create productor thread %d\n", i);
                       status = pthread_create(&tid, &attr, producer, NULL);
                       if (status<0)
                               printf("Create thread %d failed!\n", i);
               }

               //
创建一个线程,用于通知生产者与消费者退出
               printf( "Create stop thread\n" );
               pthread_create(&tid, &attr, stopper, NULL);

               thread_single_barrier(&barrier, N_PROD+N_CONS);

               stop=0;

               //
消费者于生产者退出后,队列中的物品个数(内存没有释放)
               printf("All exited. List length: %d.\n", length);
              
               //
释放已经分配但没有释放的内存
               while ( products_list )
               {
                       remove_request();
                       length--;
               }
               printf("All exited. List length: %d.\n", length);
               sleep(4);
        }
}

 

阅读(1379) | 评论(0) | 转发(0) |
0

上一篇:没有了

下一篇:Linux环境进程间通信(一)

给主人留下些什么吧!~~