-
form:luozhiyong131.blog.chinaunix.net
-
/*
-
* Produce/Consume using multithread
-
*/
-
-
#include <stdio.h>
-
#include <stdlib.h>
-
#include <pthread.h>
-
#include <string.h>
-
#include <assert.h>
-
#include <time.h>
-
#include <unistd.h>
-
#include <errno.h>
-
-
#define BSIZE 30 //buffer size
-
#define PRODUCE_EXIT '~' //produce thread exit flag
-
#define CONSUME_EXIT '@' //consume thread exit flag
-
#define PRODUCE_NUMBER 3 //produce thread number
-
#define CONSUME_NUMBER 5 //consume thread number
-
-
typedef struct buffer_t
-
{
-
char buf[BSIZE];
-
int occupied; //product number
-
int nextin;
-
int nextout;
-
pthread_mutex_t mutex;
-
pthread_cond_t less;
-
pthread_cond_t more;
-
}buffer_t;
-
-
pthread_once_t once = PTHREAD_ONCE_INIT;
-
-
buffer_t ware;
-
-
/*
-
* initialization
-
*/
-
void init(void)
-
{
-
memset(&ware, 0, sizeof(ware));
-
pthread_mutex_init(&ware.mutex, NULL);
-
pthread_cond_init(&ware.less, NULL);
-
pthread_cond_init(&ware.more, NULL);
-
}
-
/*
-
* destroy
-
*/
-
void destroy(void)
-
{
-
pthread_mutex_destroy(&ware.mutex);
-
pthread_cond_destroy(&ware.less);
-
pthread_cond_destroy(&ware.more);
-
}
-
-
/*
-
* The produce thread
-
*/
-
void * threadProduce(void * arg)
-
{
-
char c = 0;
-
struct timespec tt;
-
int result;
-
-
while(c != PRODUCE_EXIT)
-
{
-
//read product from keyboard
-
c = getchar();
-
if(c == '\n')
-
continue;
-
-
//lock mutex
-
assert( pthread_mutex_lock(&ware.mutex) == 0);
-
//condition wait
-
tt.tv_sec = time(NULL)+1;
-
tt.tv_nsec= 0;
-
if(ware.occupied >= BSIZE)
-
{
-
result = pthread_cond_timedwait(
-
&ware.more,
-
&ware.mutex,
-
&tt);
-
if(result == ETIMEDOUT)
-
{
-
assert( pthread_mutex_unlock(&ware.mutex) == 0);
-
continue;
-
}else if(result != 0)
-
{
-
printf("pthread_cond_timedwait ERROR.\n");
-
assert( pthread_mutex_unlock(&ware.mutex) == 0);
-
pthread_exit(NULL);
-
}
-
}
-
//insert product into buf
-
ware.buf[ware.nextin] = c;
-
ware.occupied++;
-
ware.nextin = (ware.nextin+1)%BSIZE;
-
-
printf("produce: [tid=%u] [%c].\n",
-
pthread_self(), c);
-
-
//aware the consume thread
-
pthread_cond_signal(&ware.less);
-
//unlock mutex
-
assert( pthread_mutex_unlock(&ware.mutex) == 0);
-
sleep(1);
-
}
-
printf("produce [tid=%u] EXIT.\n", pthread_self());
-
pthread_exit(NULL);
-
}
-
-
/*
-
* The consume thread
-
*/
-
void * threadConsume(void * arg)
-
{
-
char c = 0;
-
struct timespec tt;
-
int result;
-
-
while(c != CONSUME_EXIT)
-
{
-
//lock mutex
-
assert( pthread_mutex_lock(&ware.mutex) == 0);
-
//condition wait
-
tt.tv_sec = time(NULL)+1;
-
tt.tv_nsec= 0;
-
if(ware.occupied <= 0)
-
{
-
result = pthread_cond_timedwait(
-
&ware.less,
-
&ware.mutex,
-
&tt);
-
if(result == ETIMEDOUT)
-
{
-
assert( pthread_mutex_unlock(&ware.mutex) == 0);
-
continue;
-
}else if(result != 0)
-
{
-
printf("pthread_cond_timedwait ERROR.\n");
-
assert( pthread_mutex_unlock(&ware.mutex) == 0);
-
pthread_exit(NULL);
-
}
-
}
-
//read product from buf
-
c = ware.buf[ware.nextout];
-
ware.occupied--;
-
ware.nextout = (ware.nextout+1)%BSIZE;
-
printf("consume: [tid=%u] [%c].\n",
-
pthread_self(), c);
-
-
//aware the produce thread
-
pthread_cond_signal(&ware.more);
-
//unlock mutex
-
assert( pthread_mutex_unlock(&ware.mutex) == 0);
-
sleep(1);
-
}
-
printf("consume [tid=%u] EXIT.\n", pthread_self());
-
pthread_exit(NULL);
-
}
-
-
int main(void)
-
{
-
pthread_t tid[PRODUCE_NUMBER+CONSUME_NUMBER];
-
int i;
-
-
pthread_once(&once, init);
-
for(i=0; i<PRODUCE_NUMBER; i++)
-
{
-
if( pthread_create(&tid[i], NULL,
-
threadProduce, NULL) )
-
{
-
printf("pthread_create ERROR.\n");
-
destroy();
-
exit(-1);
-
}
-
}
-
for(; i<PRODUCE_NUMBER+CONSUME_NUMBER; i++)
-
{
-
if( pthread_create(&tid[i], NULL,
-
threadConsume, NULL) )
-
{
-
printf("pthread_create ERROR.\n");
-
destroy();
-
exit(-1);
-
}
-
}
-
-
for(i=0; i<PRODUCE_NUMBER+CONSUME_NUMBER; i++)
-
{
-
pthread_join(tid[i], NULL);
-
}
-
destroy();
-
-
pthread_exit(NULL);
-
}
-
/*
-
* 线程同步
-
*/
-
#include <stdio.h>
-
#include <stdlib.h>
-
#include <pthread.h>
-
#include <assert.h>
-
-
int product = 0;
-
-
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
-
-
/*
-
* 线程函数 读
-
*/
-
void * threadRead(void * arg)
-
{
-
int cnt = 0;
-
while(cnt++ < 100)
-
{
-
// assert( pthread_mutex_lock(&mutex) == 0);
-
assert( pthread_rwlock_rdlock(&rwlock) == 0);
-
-
printf("Read: product = %d\n", product);
-
-
// assert( pthread_mutex_unlock(&mutex) == 0);
-
assert( pthread_rwlock_unlock(&rwlock) == 0);
-
sleep(1);
-
}
-
pthread_exit(NULL);
-
}
-
-
/*
-
* 线程函数 写 增加
-
*/
-
void * threadProduce(void * arg)
-
{
-
int cnt = 0;
-
while(cnt++ < 100)
-
{
-
// assert( pthread_mutex_lock(&mutex) == 0);
-
assert( pthread_rwlock_wrlock(&rwlock) == 0);
-
-
product++;
-
printf("Produce: product = %d\n", product);
-
-
// assert( pthread_mutex_unlock(&mutex) == 0);
-
assert( pthread_rwlock_unlock(&rwlock) == 0);
-
sleep(2);
-
}
-
pthread_exit(NULL);
-
}
-
-
/*
-
* 线程函数 写 减少
-
*/
-
void * threadConsume(void * arg)
-
{
-
int cnt = 0;
-
while(cnt++ < 100)
-
{
-
// assert( pthread_mutex_lock(&mutex) == 0);
-
assert( pthread_rwlock_wrlock(&rwlock) == 0);
-
-
product--;
-
printf("Consume: product = %d\n", product);
-
-
// assert( pthread_mutex_unlock(&mutex) == 0);
-
assert( pthread_rwlock_unlock(&rwlock) == 0);
-
sleep(2);
-
}
-
pthread_exit(NULL);
-
}
-
-
int main(void)
-
{
-
pthread_t tidRead[20], tidProduce, tidConsume;
-
int i;
-
-
for(i=0; i<20; i++)
-
if( pthread_create(&tidRead[i], NULL,
-
threadRead, NULL) )
-
{
-
printf("pthread_create ERROR\n");
-
exit(-1);
-
}
-
-
if( pthread_create(&tidProduce, NULL,
-
threadProduce, NULL) )
-
{
-
printf("pthread_create ERROR\n");
-
exit(-1);
-
}
-
-
if( pthread_create(&tidConsume, NULL,
-
threadConsume, NULL) )
-
{
-
printf("pthread_create ERROR\n");
-
exit(-1);
-
}
-
-
pthread_exit(NULL);
-
}
阅读(830) | 评论(0) | 转发(0) |