希望和广大热爱技术的童鞋一起交流,成长。
分类:
2011-07-22 13:46:57
原文地址:(7)线程同步 作者:g_programming
(7)线程同步
注:所以文章红色字体代表需要特别注意和有问题还未解决的地方,蓝色字体表示需要注意的地方
1. 本文所介绍的程序平台
开发板:arm9-mini2440
虚拟机为:Red Hat Enterprise Linux 5
开发板上系统内核版本:linux-2.6.32.2
2. 线程同步基础
例如:
struct employee
{
int id;
char name[10];
}
由于两个线程a和b同时进行,就有可能线程a修改了变量id后,线程b开始运行并修改了变量id和name,然后线程a又修改了变量name,这时就造成了数据不一致。
3.互斥锁基础
详细请看:
http://blog.chinaunix.net/space.php?uid=25324849&do=blog&id=206990
互斥锁定义:pthread_mutex_t
互斥锁初始化: PTHREAD_MUTEX_INITIALIZER
注:仅使用于静态分配的互斥锁
或者以下面方式初始化,但要使用相应的注销函数:
#include
1. int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
2. int pthread_mutex_destroy(pthread_mutex_t *mutex);
Both return: 0 if OK, error number on failure
3. Int pthread_mutex_lock(pthread_mutex_t *mutex)
如果互斥锁已经被锁定,那么调用者将进入睡眠状态,函数返回时将唤醒调用者。
4. int pthread_mutex_trylock(pthread_mutex_t *mutex)
如果互斥锁已经被锁定,那么调用者不会睡眠状态,而是返回非零的EBUSY错误值。
5. int pthread_mutex_unlock(pthread_mutex_t *mutex)
返回值:成功返回0, 否则返回错误编号
4. 读写锁基础
详细请看:
http://blog.chinaunix.net/space.php?uid=25324849&do=blog&id=206993
5. 条件变量基础
详细请看:
http://blog.chinaunix.net/space.php?uid=25324849&do=blog&id=206996
注意:条件变量本身并不提供“锁”,因此互斥锁常伴随条件条件变量使用,提供相应的锁以安全地访问条件变量。
条件变量实例:服务器采用多线程无连接方式,主线程接收客户请求,并将客户请求插入“请求队列”,而其他3个现场则从“请求队列”中取出相应的请求,并将信息回应客户,为实现线程同步使用条件变量,当主线程将客户请求插入“请求变量”时,利用条件变量发信息给其他线程,唤醒其中一个来处理客户请求。
注意:服务程序有问题,由于服务器,客户地址设计为全局变量,当同时有两个客户请求到来时,服务器收到信息并填充客户信息,这个时候就会出现填充错误,还有就是处理客户请求的函数里面使用了客户地址,这样可能丢失客户的信息。
假如添加了3个客户请求,但是都没有来得及处理,这个时候处理3个客户请求,但是3个客户请求的地址只有一个了。
解决方法:将客户信息放于请求队列里面,收到信息的时候不用加锁,由于服务器主线程同时只能处理一个,所以不用考虑并发,其实我认为add_request函数里面也不必加锁的。
//server.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define NUM_HANDLER_THREADS 3 /* number of threads used to service requests */
#define PORT 1234 /* Port that will be opened */
#define MAXDATASIZE 100 /* Max number of bytes of data */
pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t list_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t got_request = PTHREAD_COND_INITIALIZER;
int num_requests = 0; /* number of pending requests, initially none */
int sockfd; /* socket descriptors */
struct sockaddr_in server; /* server's address information */
struct sockaddr_in client; /* client's address information */
socklen_t sin_size;
/* format of a single request. */
struct request {
char info[MAXDATASIZE]; /* client's data */
struct request* next; /* pointer to next request, NULL if none. */
};
struct request* requests = NULL; /* head of linked list of requests. */
struct request* last_request = NULL; /* pointer to last request. */
//定义add_request()函数用于向队列中添加一个客户请求
void add_request(char* info, pthread_mutex_t* p_mutex, pthread_cond_t* p_cond_var)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to newly added request. */
/* create structure with new request */
a_request = (struct request*)malloc(sizeof(struct request));
if (!a_request) { /* malloc failed? */
fprintf(stderr, "add_request: out of memory\n");
exit(1);
}
memcpy(a_request->info, info, MAXDATASIZE);
a_request->next = NULL;
/* lock the mutex, to assure exclusive access to the list */
rc = pthread_mutex_lock(p_mutex);
/* add new request to the end of the list, updating list */
/* pointers as required */
if (num_requests == 0) { /* special case - list is empty */
requests = a_request;
last_request = a_request;
}
else {
last_request->next = a_request;
last_request = a_request;
}
/* increase total number of pending requests by one. */
num_requests++;
/* unlock mutex */
rc = pthread_mutex_unlock(p_mutex);
/* signal the condition variable - there's a new request to handle */
rc = pthread_cond_signal(p_cond_var);
}
//定义get_request函数,该函数用于从请求队列中取出客户请求
struct request* get_request(pthread_mutex_t* p_mutex)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to request. */
/* lock the mutex, to assure exclusive access to the list */
rc = pthread_mutex_lock(p_mutex);
if (num_requests > 0) {
a_request = requests;
requests = a_request->next;
if (requests == NULL) { /* this was the last request on the list */
last_request = NULL;
}
/* decrease the total number of pending requests */
num_requests--;
}
else { /* requests list is empty */
a_request = NULL;
}
/* unlock mutex */
rc = pthread_mutex_unlock(p_mutex);
/* return the request to the caller. */
return a_request;
}
void handle_request(struct request* a_request, int thread_id)
{
char msg[MAXDATASIZE+40];
if (a_request) {
printf("Thread '%d' handled request '%s'\n", thread_id, a_request->info);
fflush(stdout);
sprintf(msg,"Thread '%d' handled your request '%s'\n", thread_id, a_request->info);
sendto(sockfd,msg,strlen(msg),0,(struct sockaddr *)&client,sin_size);
}
}
void* handle_requests_loop(void* data)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to a request. */
int thread_id = *((int*)data); /* thread identifying number */
/* lock the mutex, to access the requests list exclusively. */
rc = pthread_mutex_lock(&request_mutex);
while (1) {
if (num_requests > 0) { /* a request is pending */
a_request = get_request(&list_mutex);
if (a_request) { /* got a request - handle it and free it */
handle_request(a_request, thread_id);
rc = pthread_mutex_unlock(&list_mutex);
free(a_request);
}
}
else {
rc = pthread_cond_wait(&got_request, &request_mutex);
}
}
}
int main(int argc, char* argv[])
{
int thr_id[NUM_HANDLER_THREADS]; /* thread IDs */
pthread_t p_threads[NUM_HANDLER_THREADS]; /* thread's structures */
int num;
char msg[MAXDATASIZE];
/* create the request-handling threads */
for (int i=0; i
thr_id[i] = i;
pthread_create(&p_threads[i], NULL, handle_requests_loop, (void*)&thr_id[i]);
}
/* Create UDP socket */
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
{
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(sockfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while (1)
{
num = recvfrom(sockfd,msg,MAXDATASIZE,0,(struct sockaddr *)&client,&sin_size);
if (num < 0){
perror("recvfrom error\n");
exit(1);
}
msg[num] = '\0';
printf("You got a message (%s) from %s\n",msg,inet_ntoa(client.sin_addr) ); /* prints client's IP */
add_request(msg, &list_mutex, &got_request);
if (!strcmp(msg,"quit")) break;
}
close(sockfd); /* close listenfd */
return 0;
}
改变了的服务器程序:
//server.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define NUM_HANDLER_THREADS 3 /* number of threads used to service requests */
#define PORT 1234 /* Port that will be opened */
#define MAXDATASIZE 100 /* Max number of bytes of data */
pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t list_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t got_request = PTHREAD_COND_INITIALIZER;
int num_requests = 0; /* number of pending requests, initially none */
int sockfd; /* socket descriptors */
struct sockaddr_in server; /* server's address information */
struct sockaddr_in client; /* client's address information */
socklen_t sin_size;
/* format of a single request. */
struct request {
char info[MAXDATASIZE]; /* client's data */
struct sockaddr_in client; /* client's address information */
struct request* next; /* pointer to next request, NULL if none. */
};
struct request* requests = NULL; /* head of linked list of requests. */
struct request* last_request = NULL; /* pointer to last request. */
//定义add_request()函数用于向队列中添加一个客户请求
void add_request(char* info, pthread_mutex_t* p_mutex, pthread_cond_t* p_cond_var)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to newly added request. */
/* create structure with new request */
a_request = (struct request*)malloc(sizeof(struct request));
if (!a_request) { /* malloc failed? */
fprintf(stderr, "add_request: out of memory\n");
exit(1);
}
memcpy(a_request->info, info, MAXDATASIZE);
a_request->next = NULL;
a_request->client = client;
/* lock the mutex, to assure exclusive access to the list */
rc = pthread_mutex_lock(p_mutex);
/* add new request to the end of the list, updating list */
/* pointers as required */
if (num_requests == 0) { /* special case - list is empty */
requests = a_request;
last_request = a_request;
}
else {
last_request->next = a_request;
last_request = a_request;
}
/* increase total number of pending requests by one. */
num_requests++;
/* unlock mutex */
rc = pthread_mutex_unlock(p_mutex);
/* signal the condition variable - there's a new request to handle */
rc = pthread_cond_signal(p_cond_var);
}
//定义get_request函数,该函数用于从请求队列中取出客户请求
struct request* get_request(pthread_mutex_t* p_mutex)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to request. */
/* lock the mutex, to assure exclusive access to the list */
rc = pthread_mutex_lock(p_mutex);
if (num_requests > 0) {
a_request = requests;
requests = a_request->next;
if (requests == NULL) { /* this was the last request on the list */
last_request = NULL;
}
/* decrease the total number of pending requests */
num_requests--;
}
else { /* requests list is empty */
a_request = NULL;
}
/* unlock mutex */
rc = pthread_mutex_unlock(p_mutex);
/* return the request to the caller. */
return a_request;
}
void handle_request(struct request* a_request, int thread_id)
{
char msg[MAXDATASIZE+40];
if (a_request) {
printf("Thread '%d' handled request '%s'\n", thread_id, a_request->info);
fflush(stdout);
sprintf(msg,"Thread '%d' handled your request '%s'\n", thread_id, a_request->info);
sendto(sockfd,msg,strlen(msg),0,(struct sockaddr *)&a_request->client,sin_size);
}
}
void* handle_requests_loop(void* data)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to a request. */
int thread_id = *((int*)data); /* thread identifying number */
/* lock the mutex, to access the requests list exclusively. */
rc = pthread_mutex_lock(&request_mutex);
while (1) {
if (num_requests > 0) { /* a request is pending */
a_request = get_request(&list_mutex);
if (a_request) { /* got a request - handle it and free it */
handle_request(a_request, thread_id);
rc = pthread_mutex_unlock(&list_mutex);
free(a_request);
}
}
else {
rc = pthread_cond_wait(&got_request, &request_mutex);
}
}
}
int main(int argc, char* argv[])
{
int thr_id[NUM_HANDLER_THREADS]; /* thread IDs */
pthread_t p_threads[NUM_HANDLER_THREADS]; /* thread's structures */
int num;
char msg[MAXDATASIZE];
/* create the request-handling threads */
for (int i=0; i
thr_id[i] = i;
pthread_create(&p_threads[i], NULL, handle_requests_loop, (void*)&thr_id[i]);
}
/* Create UDP socket */
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
{
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(sockfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while (1)
{
num = recvfrom(sockfd,msg,MAXDATASIZE,0,(struct sockaddr *)&client,&sin_size);
if (num < 0){
perror("recvfrom error\n");
exit(1);
}
msg[num] = '\0';
printf("You got a message (%s) from %s\n",msg,inet_ntoa(client.sin_addr) ); /* prints client's IP */
add_request(msg, &list_mutex, &got_request);
if (!strcmp(msg,"quit")) break;
}
close(sockfd); /* close listenfd */
return 0;
}
// client.c
#include
#include
#include
#include
#include
#include
#include
#include
#define PORT 1234 /* Open Port on Remote Host */
#define MAXDATASIZE 100 /* Max number of bytes of data */
int main(int argc, char *argv[])
{
int fd, numbytes; /* files descriptors */
char buf[MAXDATASIZE]; /* buf will store received text */
struct hostent *he; /* structure that will get information about remote host */
struct sockaddr_in server,reply; /* server's address information */
if (argc !=3) { /* this is used because our program will need two argument (IP address and a message */
printf("Usage: %s
exit(1);
}
if ((he=gethostbyname(argv[1]))==NULL){ /* calls gethostbyname() */
printf("gethostbyname() error\n");
exit(1);
}
if ((fd=socket(AF_INET, SOCK_DGRAM, 0))==-1){ /* calls socket() */
printf("socket() error\n");
exit(1);
}
bzero(&server,sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(PORT); /* htons() is needed again */
server.sin_addr = *((struct in_addr *)he->h_addr); /*he->h_addr passes "*he"'s info to "h_addr" */
sendto(fd, argv[2], strlen(argv[2]),0,(struct sockaddr *)&server,sizeof(struct sockaddr));
while (1) {
socklen_t len;
if ((numbytes=recvfrom(fd,buf,MAXDATASIZE,0,(struct sockaddr *)&reply,&len)) == -1){ /* calls recvfrom() */
printf("recvfrom() error\n");
exit(1);
}
if (len != sizeof(struct sockaddr) || memcmp((const void *)&server, (const void *)&reply,len) != 0) {
printf("Receive message from other server.\n");
continue;
}
buf[numbytes]='\0';
printf("Server Message: %s\n",buf); /* it prints server's welcome message */
break;
}
close(fd); /* close fd */
}
在运行服务器程序时候,可能在服务器未完成所有的服务就退出了,这是由于线程退出不同步造成的。
6.同步线程退出
上述服务器程序在收到客户quit的请求后,主线程可以退出,但是可能服务器里面还有未完成的服务,我们可以利用pthread_join()函数来完成线程退出的同步。
绿色的地方为修改的地方:
//server.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define NUM_HANDLER_THREADS 3 /* number of threads used to service requests */
#define PORT 1234 /* Port that will be opened */
#define MAXDATASIZE 100 /* Max number of bytes of data */
pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t list_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t got_request = PTHREAD_COND_INITIALIZER;
int num_requests = 0; /* number of pending requests, initially none */
int quit;
int sockfd; /* socket descriptors */
struct sockaddr_in server; /* server's address information */
struct sockaddr_in client; /* client's address information */
socklen_t sin_size;
/* format of a single request. */
struct request {
char info[MAXDATASIZE]; /* client's data */
struct sockaddr_in client; /* client's address information */
struct request* next; /* pointer to next request, NULL if none. */
};
struct request* requests = NULL; /* head of linked list of requests. */
struct request* last_request = NULL; /* pointer to last request. */
//定义add_request()函数用于向队列中添加一个客户请求
void add_request(char* info, pthread_mutex_t* p_mutex, pthread_cond_t* p_cond_var)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to newly added request. */
/* create structure with new request */
a_request = (struct request*)malloc(sizeof(struct request));
if (!a_request) { /* malloc failed? */
fprintf(stderr, "add_request: out of memory\n");
exit(1);
}
memcpy(a_request->info, info, MAXDATASIZE);
a_request->next = NULL;
a_request->client = client;
/* lock the mutex, to assure exclusive access to the list */
rc = pthread_mutex_lock(p_mutex);
/* add new request to the end of the list, updating list */
/* pointers as required */
if (num_requests == 0) { /* special case - list is empty */
requests = a_request;
last_request = a_request;
}
else {
last_request->next = a_request;
last_request = a_request;
}
/* increase total number of pending requests by one. */
num_requests++;
/* unlock mutex */
rc = pthread_mutex_unlock(p_mutex);
/* signal the condition variable - there's a new request to handle */
rc = pthread_cond_signal(p_cond_var);
}
//定义get_request函数,该函数用于从请求队列中取出客户请求
struct request* get_request(pthread_mutex_t* p_mutex)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to request. */
/* lock the mutex, to assure exclusive access to the list */
rc = pthread_mutex_lock(p_mutex);
if (num_requests > 0) {
a_request = requests;
requests = a_request->next;
if (requests == NULL) { /* this was the last request on the list */
last_request = NULL;
}
/* decrease the total number of pending requests */
num_requests--;
}
else { /* requests list is empty */
a_request = NULL;
}
/* unlock mutex */
rc = pthread_mutex_unlock(p_mutex);
/* return the request to the caller. */
return a_request;
}
void handle_request(struct request* a_request, int thread_id)
{
char msg[MAXDATASIZE+40];
if (a_request) {
printf("Thread '%d' handled request '%s'\n", thread_id, a_request->info);
fflush(stdout);
sprintf(msg,"Thread '%d' handled your request '%s'\n", thread_id, a_request->info);
sendto(sockfd,msg,strlen(msg),0,(struct sockaddr *)&a_request->client,sin_size);
}
}
void* handle_requests_loop(void* data)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to a request. */
int thread_id = *((int*)data); /* thread identifying number */
/* lock the mutex, to access the requests list exclusively. */
rc = pthread_mutex_lock(&request_mutex);
while (1) {
if (num_requests > 0) { /* a request is pending */
a_request = get_request(&list_mutex);
if (a_request) { /* got a request - handle it and free it */
handle_request(a_request, thread_id);
rc = pthread_mutex_unlock(&list_mutex);
free(a_request);
}
}
else {
if(quit)
{
rc = pthread_mutex_unlock(&request_mutex);
pthread_exit(NULL);
}
else
rc = pthread_cond_wait(&got_request, &request_mutex);
}
}
}
int main(int argc, char* argv[])
{
int thr_id[NUM_HANDLER_THREADS]; /* thread IDs */
pthread_t p_threads[NUM_HANDLER_THREADS]; /* thread's structures */
int num;
char msg[MAXDATASIZE];
/* create the request-handling threads */
for (int i=0; i
thr_id[i] = i;
pthread_create(&p_threads[i], NULL, handle_requests_loop, (void*)&thr_id[i]);
}
/* Create UDP socket */
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
{
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(sockfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while (1)
{
num = recvfrom(sockfd,msg,MAXDATASIZE,0,(struct sockaddr *)&client,&sin_size);
if (num < 0){
perror("recvfrom error\n");
exit(1);
}
msg[num] = '\0';
printf("You got a message (%s) from %s\n",msg,inet_ntoa(client.sin_addr) ); /* prints client's IP */
add_request(msg, &list_mutex, &got_request);
if (!strcmp(msg,"quit")) {
int rc;
rc = pthread_mutex_lock(&request_mutex);
quit = 1;
rc = pthread_cond_broadcast(&got_request);
rc = pthread_mutex_unlock(&request_mutex);
for(int i = 0; i < NUM_HANDLER_THREADS; i++ )
pthread_join(p_threads[i], NULL);
break;
}
}
close(sockfd); /* close listenfd */
return 0;
}