#include
#include
#include
#define BUFFER_SIZE 8
struct Products
{
int buffer[BUFFER_SIZE];
pthread_mutex_t locker; //保证存取操作的原子性 互斥性
pthread_cond_t notEmpty; //是否可读,条件变量
pthread_cond_t notFull; //是否可写,条件变量
int posReadFrom;
int posWriteTo;
};
int BufferIsFull(struct Products* products)
{
if ((products->posWriteTo + 1) % BUFFER_SIZE == products->posReadFrom)
{
return (1);
}
return (0);
}
int BufferIsEmpty(struct Products* products)
{
if (products->posWriteTo == products->posReadFrom)
{
return (1);
}
return (0);
}
//制造产品。
void Produce(struct Products* products, int item)
{
pthread_mutex_lock(&products->locker); //原子操作
while (BufferIsFull(products))
{
pthread_cond_wait(&products->notFull, &products->locker); //线程阻塞-》解锁-》等待(如果信号量释放)-》返回-》加锁
} //无空间可写入
//写入数据
products->buffer[products->posWriteTo] = item;
products->posWriteTo++;
if (products->posWriteTo >= BUFFER_SIZE)
products->posWriteTo = 0;
pthread_cond_signal(&products->notEmpty); //发信
pthread_mutex_unlock(&products->locker); //解锁
}
int Consume(struct Products* products)
{
int item;
pthread_mutex_lock(&products->locker);
while (BufferIsEmpty(products))
{
pthread_cond_wait(&products->notEmpty, &products->locker);
} //为空时持续等待,无数据可读
//提取数据
item = products->buffer[products->posReadFrom];
products->posReadFrom++;
if (products->posReadFrom >= BUFFER_SIZE) //如果到末尾,从头读取
products->posReadFrom = 0;
pthread_cond_signal(&products->notFull); //唤醒该条件变量
pthread_mutex_unlock(&products->locker);
return item;
}
#define END_FLAG (-1)
struct Products products;
void* ProducerThread(void* data)
{
int i;
for (i = 0; i < 16; ++i)
{
printf("producer: %d\n", i);
Produce(&products, i);
}
Produce(&products, END_FLAG);
return NULL;
}
void* ConsumerThread(void* data)
{
int item;
while (1)
{
item = Consume(&products);
if (END_FLAG == item)
break;
printf("consumer: %d\n", item);
}
return (NULL);
}
int main(int argc, char* argv[])
{
pthread_t producer_id;
pthread_t consumer_id;
int result;
pthread_mutex_t locker = PTHREAD_MUTEX_INITIALIZER; //初始化
pthread_cond_t notEmpty = PTHREAD_COND_INITIALIZER; //初始化
pthread_cond_t notFull = PTHREAD_COND_INITIALIZER;
pthread_create(&producer_id, NULL, &ProducerThread, NULL); //创建线程
pthread_create(&consumer_id, NULL, &ConsumerThread, NULL);
pthread_join(producer_id, (void *)&result); //在主线程调用的函数运行结束后,主线程才会退出。
pthread_join(consumer_id, (void *)&result);
exit(EXIT_SUCCESS);
}
/*code end*/
编译: gcc -o test test.c -lpthread
主要过程为:
一个Condition Variable总是和一个Mutex搭配使用的。一个线程可以调用pthread_cond_wait在一个Condition Variable上阻塞等待,这个函数做以下三步操作:
1. 释放Mutex
2. 阻塞等待
3. 当被唤醒时,重新获得Mutex并返回
pthread_cond_timedwait函数还有一个额外的参数可以设定等待超时,如果到达了abstime所指定的时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT。一个线程可以调用pthread_cond_signal唤醒在某个Condition Variable上等待的另一个线程,也可以调用pthread_cond_broadcast唤醒在这个Condition Variable上等待的所有线程。
下面是引用网上一位兄弟的对条件变量函数的分析,写得不错。
对pthread_cond_wait()函数的理解:
对pthread_cond_wait()函数的理解(我在CU上回复一个人的问题的解答)
(个人见解,如有错误,恳请大家指出)
/************pthread_cond_wait()的使用方法**********/
pthread_mutex_lock(&qlock);
pthread_cond_wait(&qready, &qlock);
pthread_mutex_unlock(&qlock);
/*****************************************************/
The mutex passed to pthread_cond_wait protects the condition.The caller passes it locked to the function, which then atomically places the calling thread on the list of threads waiting for the condition and unlocks the mutex. This closes the window between the time that the condition is checked and the time that the thread goes to sleep waiting for the condition to change, so that the thread doesn't miss a change in the condition. When pthread_cond_wait returns, the mutex is again locked.
上面是APUE的原话,就是说pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)函数传入的参数mutex用于保护条件,因为我们在调用pthread_cond_wait时,如果条件不成立我们就进入阻塞,但是进入阻 塞这个期间,如果条件变量改变了的话,那我们就漏掉了这个条件。因为这个线程还没有放到等待队列上,所以调用pthread_cond_wait前要先锁 互斥量,即调用pthread_mutex_lock(),pthread_cond_wait在把线程放进阻塞队列后,自动对mutex进行解锁,使得 其它线程可以获得加锁的权利。这样其它线程才能对临界资源进行访问并在适当的时候唤醒这个阻塞的进程。当pthread_cond_wait返回的时候又自动给mutex加锁。
实际上边代码的加解锁过程如下:
/************pthread_cond_wait()的使用方法**********/
pthread_mutex_lock(&qlock); /*lock*/
pthread_cond_wait(&qready, &qlock); /*block-->unlock-->wait() return-->lock*/
pthread_mutex_unlock(&qlock); /*unlock*/
/*****************************************************/