内涵:和一个普通互斥体结合使用的一块资源。包括了变量本身、正在等待的线程信息等。
初始化: pthread_cond_t cond = PTHREAD_COND_ININTIALIZER;
销毁: pthread_cond_destroy(pthread_cond_t*);
等待和发送信号:
假设:有一个房间,钥匙是mutex,一开始门是开着的。 房间内有一个电话cond,包括所有在上面已经登记过的电话号码(线程id)
1. pthread_mutex_lock(&mutex);
2. pthread_cond_wait(&cond, &mutex);
// do sth
3. pthread_mutex_unlock(&mutex);
过程: 第1步,来了一个人小张(线程),先进房间并且锁上门,不让其他人同时进来。第2步,小张把自己的电话号码登记在了电话上。然后小张就打开锁出来了并且倒地就睡一直等到有人打电话给他,因为他要等其他人做完了一些事情之后打电话通知他以便它继续做。第3步,有人打电话给小张了,小张又进入房间并且锁上门,干他自己分内的事情了。。。
phtread_cond_signal(&cond);按照某些规则挑一个电话号码(比如优先级高低)通知那个人(线程)
下面是我写的一个 用条件变量实现join所有线程的例子
#include "../common.h" //常用的unix头文件,自己写
#include "netdb.h"
#include "pthread.h"
struct client_info {
int tid;
int is_end;
};
struct client_info ci[20];
int ndone = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
class client_reply {
public:
int tid;
char* msg;
};
void* client_work(void* arg) {
printf("thread %d start...\n", pthread_self());
int sockfd = *((int*)arg);
char buf[1024];
for(;;) {
int n = read(sockfd, buf, sizeof(buf));
if(n == 0)break;
if(n == -1)break;
buf[n] = 0;
printf("thread %d read %d bytes:%s\n", pthread_self(), n, buf);
}
client_reply* p = new client_reply;
p->msg = "oh baby";
p->tid = pthread_self();
pthread_mutex_lock(&mutex);
ndone++;
int i;
for(i=0; i<20; i++) {
if(ci[i].tid == 0) {
ci[i].tid = pthread_self();
ci[i].is_end = 1;
break;
}
}
printf("thread %d end...\n", pthread_self());
// printf("c[%d].tid = %d\tc[%d].is_end = %d\t ndone=%d\n", i, ci[i].tid, i, ci[i].is_end, ndone);
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond);
return p;
}
void* client_wait(void* arg) {
printf("thread %d client wait start...\n", pthread_self());
for(;;) {
pthread_mutex_lock(&mutex);
while(ndone == 0) {
printf("cleint wait ....\n");
pthread_cond_wait(&cond, &mutex);
}
for(int i=0; i<20; i++) {
if(ci[i].is_end == 1) {
struct client_reply *reply;
pthread_join(ci[i].tid, (void**)&reply);
ndone--;
printf("pthread_join tid:%d msg:%s\n", reply->tid, reply->msg);
memset(&ci[i], 0, sizeof(struct client_info));
}
}
pthread_mutex_unlock(&mutex);
}
return 0;
}
int main(int argc, char** argv) {
memset(ci, 0, sizeof(struct client_info)*20);
char* ip = NULL;
char* port = "8888";
if(argc > 1)ip = argv[1];
if(argc > 2)port = argv[2];
struct addrinfo *res;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
int n = getaddrinfo(ip, port, &hints, &res);
if(n != 0)perror("getaddrinfo error");
int sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if(sockfd == -1)perror("socket error");
n = bind(sockfd, res->ai_addr, res->ai_addrlen);
if(n == -1)perror("bind error");
n = listen(sockfd, 5);
if(n == -1)perror("listen error");
char buf[1024];
inet_ntop(AF_INET, &(((struct sockaddr_in*)res->ai_addr)->sin_addr), buf, sizeof(buf));
printf("serer is started in [%s]:[%d]\n", buf, ntohs(((struct sockaddr_in*)res->ai_addr)->sin_port));
//create a thread to recive client trhead
pthread_t tid;
pthread_create(&tid, 0, client_wait, 0);
for(;;) {
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_addr_len);
if(clientfd == -1) {
perror("accept error");
continue;
}
pthread_t tid;
int n = pthread_create(&tid, 0, client_work, &clientfd);
if(n != 0)printf("pthread_create error\n");
}
return 0;
}
阅读(2666) | 评论(0) | 转发(0) |