多线程生产者消费者.rar /*程序主要部分说明:
生产者线程不断顺序地将0~1000个数字写入共享的循环缓冲区,同时消费者线程不断从共享的缓冲区读取数据;生产者和消费者只能有一个在临界区,如果一个线程在临界区操作另一个线程只能阻塞等待,直到被唤醒;主要的函数创建线程函数:pthread_create(&th_a, NULL, producer, 0)等待指定的线程结束函数:pthread_join(th_a, &retval); pthread_cond_wait(&b->notfull, &b->lock); 线程挂起等待直到缓冲区不满释放锁;生产者进行生产; pthread_cond_wait(&b->notempty, &b->lock); 线程挂起等待直到缓冲区不空释放锁消费者读取数据;
pthread_cond_signal(&b->notempty); /唤醒等待在b->notempty条件变量上的消费者线程上锁;
pthread_cond_signal(&b->notfull); //若缓冲区不满,则唤醒b->notfull条件变量上的生产者线程上锁;*/
#include
#include
#include
#include "pthread.h"
#define BUFFER_SIZE 16
/* Circular buffer of integers. */
struct prodcons {
int buffer[BUFFER_SIZE]; /* the actual data */ //定义数据缓冲区;
pthread_mutex_t lock; /* mutex ensuring exclusive access to buffer */ //定义锁;
int readpos, writepos; /* positions for reading and writing */ //定义读,写变量
pthread_cond_t notempty; /* signaled when buffer is not empty */ //定义不空时的信号;
pthread_cond_t notfull; /* signaled when buffer is not full */ //定义空时的信号;
};
/*--------------------------------------------------------*/
/* Initialize a buffer */
void init(struct prodcons * b)
{
pthread_mutex_init(&b->lock, NULL); //初始化锁;
pthread_cond_init(&b->notempty, NULL); //初始化b->notempty条件变量;
pthread_cond_init(&b->notfull, NULL); //初始化b->notfull条件变量;
b->readpos = 0; //初始化读写位置;
b->writepos = 0;
}
/*--------------------------------------------------------*/
/* Store an integer in the buffer */
void put(struct prodcons * b, int data) //生产者写入共享循环缓冲区的函数;
{
pthread_mutex_lock(&b->lock); //上锁;
/* Wait until buffer is not full */
while ((b->writepos + 1) % BUFFER_SIZE == b->readpos) { //判断读写位置,writepos + 1为了防止两个进程同时阻塞等待;
printf("wait for not full\n");
pthread_cond_wait(&b->notfull, &b->lock); //线程挂起等待直到缓冲区不满释放锁;;
}
/* Write the data and advance write pointer */
b->buffer[b->writepos] = data; //写数据;
b->writepos++; //写的位置加1;
if (b->writepos >= BUFFER_SIZE) b->writepos = 0; //如果写的位置大于缓冲区的长度,则把写的指针置0
/* Signal that the buffer is now not empty */
pthread_cond_signal(&b->notempty); //唤醒等待在b->notempty条件变量上的消费者线程上锁;
pthread_mutex_unlock(&b->lock); //解锁;
}
/*--------------------------------------------------------*/
/* Read and remove an integer from the buffer */
int get(struct prodcons * b) //消费者读取共享循环缓冲区的函数;
{
int data;
pthread_mutex_lock(&b->lock); //上锁,成功返回0;
/* Wait until buffer is not empty */
while (b->writepos == b->readpos) { //判断读写位置是否相等;
printf("wait for not empty\n");
pthread_cond_wait(&b->notempty, &b->lock); //线程挂起等待直到缓冲区不空释放锁;
}
/* Read the data and advance read pointer */
data = b->buffer[b->readpos]; //读数据;
b->readpos++; //读的位置加1;
if (b->readpos >= BUFFER_SIZE) b->readpos = 0; //如果读的位置大于缓冲区的长度,则把读的指针置0;
/* Signal that the buffer is now not full */
pthread_cond_signal(&b->notfull); //若缓冲区不满,则唤醒b->notfull条件变量上的生产者线程上锁;
pthread_mutex_unlock(&b->lock); //解锁;
return data;
}
/*--------------------------------------------------------*/
#define OVER (-1)
struct prodcons buffer;
/*--------------------------------------------------------*/
void * producer(void * data) //生产者函数;
{
int n;
for (n = 0; n < 1000; n++) { //初始化buffer,共有1000个产品;
printf(" put-->%d\n", n);
put(&buffer, n); //将产品放入共享区;
}
put(&buffer, OVER); //over 结束标志;
printf("producer stopped!\n");
return NULL;
}
/*--------------------------------------------------------*/
void * consumer(void * data) //消费者函数;
{
int d;
while (1) {
d = get(&buffer); //从共享数据区读取数据到变量d;
if (d == OVER ) break; //判断d是否是over;
printf(" %d-->get\n", d); //打印缓冲区的值;
}
printf("consumer stopped!\n");
return NULL;
}
/*--------------------------------------------------------*/
int main(void)
{
pthread_t th_a, th_b; //定义线程号;
void * retval; //定义返回值;
init(&buffer); //初始化共享缓冲区;
pthread_create(&th_a, NULL, producer, 0); //创建生产者线程;
pthread_create(&th_b, NULL, consumer, 0); //创建消费者线程;
/* Wait until producer and consumer finish. */
pthread_join(th_a, &retval); //主线程以阻塞的方式等待th_a指定的线程结束。当函数返回时,被等待线程的资源被收回。如果进程已经结束,那么该函数会立即返回
pthread_join(th_b, &retval); //主线程以阻塞的方式等待th_b指定的线程结束;
return 0;
}
阅读(1291) | 评论(0) | 转发(0) |