希望和广大热爱技术的童鞋一起交流,成长。
分类:
2011-07-22 13:47:08
原文地址:(5)多线程服务器 作者:g_programming
(5)多线程服务器
注:所以文章红色字体代表需要特别注意和有问题还未解决的地方,蓝色字体表示需要注意的地方
1. 本文所介绍的程序平台
开发板:arm9-mini2440
虚拟机为:Red Hat Enterprise Linux 5
开发板上系统内核版本:linux-2.6.32.2
2.线程基础
同一进程可以包括多个线程,这些线程共享相同的内存空间,不同的线程可以存取相同的全局变量、相同的堆数据和文件描述符等,进程间通信需要专门的机制IPC,而线程不需要,所以线程的创建比进程的创建快10~100倍左右,另一方面如果一个现场崩溃,它将影响到同一进程的其他线程。
同一进程中的线程共享如下内容:
全局变量
堆数据
打开的描述符
当前工作目录
用户及用户组ID
但是每个线程具有独立的:
线程ID
堆栈
errno变量
优先级
3.线程函数
3.1 pthread_create()函数
#include
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
thread:指向线程的ID的指针。
attr:指向线程属性的指针,包括线程优先级,堆栈大小等,如果属性为NULL则使用系统默认的属性,定义的属性在创建后修改attr不能改变线程的属性。
start_routine:指向线程的执行函数,该函数必须是一个静态函数。
arg:向执行函数传递参数的通用指针。
返回值:成功返回0,否则返回
ENOMEM:没有足够内存产生线程
EINVAL:无限的attr值
EPERM:调用者无权设置调度参数或者调度策略
3.2 pthread_join()函数
#include
int pthread_join(pthread_t thread, void **value_ptr)
作用:该函数挂起当前线程直到所等待的线程结束,与waitpid()功能类似。
Thread:所等待线程的ID,该线程必须是当前进程的成员,并且不是分离线程和守护线程,几个线程不能同时等待同一线程结束。
value_ptr:如果该值非空,则指向终止线程的退出状态。
返回值:如果成功调用则返回0,否则返回
EINVAL:无限的线程ID
ESRCH:无法找到相应ID的线程
3.3 pthread_detach()函数
#include
int pthread_ detach (pthread_t thread)
thread:被设置为“可分离的”线程ID
返回值:成功返回0,返回错误码
3.4 pthread_exit()函数
#include
int pthread_ exit (void *value_ptr)
value_ptr:指向线程的退出状态,注意不要指向一个局部变量,因为当前进程终止时候所有的局部变量将被撤销。
返回值:成功返回0,返回错误码
3.5 pthread_self()函数
#include
int pthread_ self (void)
返回值:返回当前线程的ID
3.6 pthread_cancle()函数
#include
int pthread_ cancle (pthread_t thread)
thread:被终止的线程ID
返回值:成功返回0,返回错误码
4. 给线程传递参数
4.1 pthread_create创建时传递的void *arg参数,如果传递的为多个数据,可以讲多个数据封装在一个结构体里面。
注意:在处理多个客户时,变量arg是所有线程共用的,注意保护
4.2通过指针参数传递
在c语言里面传递给函数的参数是被拷贝到函数的执行堆栈中的,可以利用这个特点使新线程得到参数的拷贝,主线程将要传递的数据类型转换成通用指针类型,然后传递给新线程,新线程再将接收的数据参数转换成原数据类型。
注意:arg类型必须能够正确的转换为通用指针类型,arg的字节长度必须小于等于通用指针的长度
4.3通过分配arg的空间传递参数(常用)
主线程首先为每个线程分配存储arg的空间,再将arg传递给新线程
注意:在新线程里面注意把分配的空间释放掉
5. 多线程服务器设计
5.1以下是多线程服务器设计实例
功能:
服务器等候客户连接请求,一旦连接成功显示客户地址,接收该客户的名字并显示,然后接收来自用户的信息,每收到一个字符串则显示,并将字符串反转,再将反转的字符串发回客户端。
客户端首先与服务器相连,接着发送客户端名字,然后发送客户信息,接收到服务器信息并显示,之后等待用户输入Crtl+D,就关闭连接并退出。
//server.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define PORT 1234 /* Port that will be opened */
#define BACKLOG 5 /* Number of allowed connections */
#define MAXDATASIZE 1000
void process_cli(int connectfd, sockaddr_in client);
/* function to be executed by the new thread */
void* start_routine(void* arg);
struct ARG {
int connfd;
sockaddr_in client;
};
main()
{
int listenfd, connectfd; /* socket descriptors */
pthread_t thread;
ARG *arg;
struct sockaddr_in server; /* server's address information */
struct sockaddr_in client; /* client's address information */
socklen_t sin_size;
/* Create TCP socket */
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() error\n");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while(1)
{
/* Accept connection */
if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {
perror("accept() error\n");
exit(1);
}
/* Create thread*/
arg = new ARG;
arg->connfd = connectfd;
memcpy((void *)&arg->client, &client, sizeof(client));
if (pthread_create(&thread, NULL, start_routine, (void*)arg)) {
/* handle exception */
perror("Pthread_create() error");
exit(1);
}
}
close(listenfd); /* close listenfd */
}
void process_cli(int connectfd, sockaddr_in client)
{
int num;
char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];
printf("You got a connection from %s. ",inet_ntoa(client.sin_addr) );
/* Get client's name from client */
num = recv(connectfd, cli_name, MAXDATASIZE,0);
if (num == 0) {
close(connectfd);
printf("Client disconnected.\n");
return;
}
cli_name[num - 1] = '\0';
printf("Client's name is %s.\n",cli_name);
while (num = recv(connectfd, recvbuf, MAXDATASIZE,0)) {
recvbuf[num] = '\0';
printf("Received client( %s ) message: %s",cli_name, recvbuf);
for (int i = 0; i < num - 1; i++) {
sendbuf[i] = recvbuf[num - i -2];
}
sendbuf[num - 1] = '\0';
send(connectfd,sendbuf,strlen(sendbuf),0);
}
close(connectfd); /* close connectfd */
}
void* start_routine(void * arg)
{
ARG *info;
info = (ARG *)arg;
/* handle client’s requirement */
process_cli(info->connfd, info->client);
delete (ARG *)arg;
pthread_exit(NULL);
}
// client.c
#include
#include
#include
#include
#include
#include
#include
#include
#define PORT 1234 /* Open Port on Remote Host */
#define MAXDATASIZE 100 /* Max number of bytes of data */
void process(FILE *fp, int sockfd);
char* getMessage(char* sendline,int len, FILE* fp);
int main(int argc, char *argv[])
{
int fd; /* files descriptors */
struct hostent *he; /* structure that will get information about remote host */
struct sockaddr_in server; /* server's address information */
if (argc !=2) {
printf("Usage: %s
exit(1);
}
if ((he=gethostbyname(argv[1]))==NULL){ /* calls gethostbyname() */
printf("gethostbyname() error\n");
exit(1);
}
if ((fd=socket(AF_INET, SOCK_STREAM, 0))==-1){ /* calls socket() */
printf("socket() error\n");
exit(1);
}
bzero(&server,sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(PORT); /* htons() is needed again */
server.sin_addr = *((struct in_addr *)he->h_addr);
if(connect(fd, (struct sockaddr *)&server,sizeof(struct sockaddr))==-1){ /* calls connect() */
printf("connect() error\n");
exit(1);
}
process(stdin,fd);
close(fd); /* close fd */
}
void process(FILE *fp, int sockfd)
{
char sendline[MAXDATASIZE], recvline[MAXDATASIZE];
int numbytes;
printf("Connected to server. \n");
/* send name to server */
printf("Input name : ");
if ( fgets(sendline, MAXDATASIZE, fp) == NULL) {
printf("\nExit.\n");
return;
}
send(sockfd, sendline, strlen(sendline),0);
/* send message to server */
while (getMessage(sendline, MAXDATASIZE, fp) != NULL) {
send(sockfd, sendline, strlen(sendline),0);
if ((numbytes = recv(sockfd, recvline, MAXDATASIZE,0)) == 0) {
printf("Server terminated.\n");
return;
}
recvline[numbytes]='\0';
printf("Server Message: %s\n",recvline); /* it prints server's welcome message */
}
printf("\nExit.\n");
}
char* getMessage(char* sendline,int len, FILE* fp)
{
printf("Input string to server:");
return(fgets(sendline, MAXDATASIZE, fp));
}
6. 线程安全
6.1线程安全基础
注意:由于同一进程中的所有线程共享相同的内存空间,如果多个线程修改相同的内存区域就会造成意想不到的后果。
在unix系统中,由于静态变量的使用,造成许多API函数都不是线程安全的,对于大都非线程安全函数都要一个线程安全版本,即MT-safe版本。新的安全函数一般是旧的函数加上_r后缀,例如Solaris系统提供的函数:
asctime_r(3C) ctermid_r(3S) ctime_r(3C)
fgetgrent_r(3C) fgetpwent_r(3C) fgetspent_r(3C)
Gamma_r(3M) getgrgid_r(3C) getgrnam_r(3C)
getlogin_r(3C) getpwnam_r(3C) getpwuid_r(3C)
getgrent_r(3C) gethostbyaddr_r(3N) gethostbyname_r(3N)
gethostent_r(3N) getnetbyaddr_r(3N) getnetbyname_r(3N)
getnetent_r(3N) Getprotobyname_r(3N) getprotobynumber_r(3N)
getprotoent_r(3N) getpwent_r(3C) getrpcbyname_r(3N)
getrpcbynumber_r(3N) getrpcent_r(3N) getservbyname_r(3N)
getservbyport_r(3N) getservent_r(3N) getspent_r(3C)
getspnam_r(3C) gmtime_r(3C) lgamma_r(3M)
localtime_(3C)r nis_sperror_r(3N) rand_r(3C)
readdir_r(3C) strtok_r(3C) tmpnam_r(3C)
ttyname_r(3C)
静态变量:由于静态变量只创建一次,在第一次创建之后,如果其他的线程又使用同一段代码,那么此静态变量继续沿用第一次创建的,所有不安全。
局部变量:在线程中局部变量是线程安全的,它的生存周期只限于创建它的作用域。
http://blog.csdn.net/yangjian15/archive/2009/11/07/4783016.aspx
以下代码就不是线程安全的:
void savedata(char* recvbuf, int len, char* cli_data)
{
static int index = 0;
for (int i = 0; i < len - 1; i++) {
cli_data[index++] = recvbuf[i];
}
cli_data[index] = '\0';
}
6.2线程专用数据(TSD)
6.2.1 pthread_key_create()函数
#include
int pthread_key_create (pthread_key_t *key, void (*destructor)(void *value))
key:指向创建的关键字
destructor:一个可选的析构函数
返回值:成功返回0,返回错误码
EAGAIN:关键字的名字空间用尽
ENOMEM:内存不足
6.2.2 pthread_setspecific()函数:为TSD关键字绑定一个与本线程相关的值
#include
int pthread_ setspecific (pthread_key_t key, const void *value)
key:TSD关键字
value:本线程相关的值
返回值:成功返回0,返回错误码
EINVAL:关键字非法
ENOMEM:内存不足
6.2.3 pthread_getspecific()函数:为TSD关键字绑定一个与本线程相关的值
#include
Void * pthread_ getspecific (pthread_key_t key,)
key:TSD关键字
返回值:成功返回与调用线程相关的关键字所绑定的值,否则返回NULL
6.2.4 pthread_once()函数:
#include
pthread_once_t once_control = PTHREAD_ONCE_INIT
int pthread_ once (pthread_once_t *once_control,
void (* init_routine)(void))
once_control:在线程专用数据使用中,该值被设置为PTHREAD_ONCE_INIT
init_routine:当once_control 被设置为PTHREAD_ONCE_INIT时,该函数在同一进程中只被调用一次。
返回值:成功返回0,返回错误码
EINVAL:关键字非法
注意; 在进程内部有唯一的关键字KEY,可以调用pthread_key_create来创建,然后每个线程可以将自己的数据绑定到这个关键字上,绑定的数据就是TSD.所以pthread_key_create只进行一次,而当前线程设置自己的数据可以先通过pthread_ getspecific来知道自己设置没有如果返回NULL标志着还没设置。
程序清单;
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define PORT 1234 /* Port that will be opened */
#define BACKLOG 5 /* Number of allowed connections */
#define MAXDATASIZE 1000
void process_cli(int connectfd, sockaddr_in client);
void savedata_r(char* recvbuf, int len, char* cli_data);
/* function to be executed by the new thread */
void* start_routine(void* arg);
struct ARG {
int connfd;
sockaddr_in client;
};
/* Thread saft */
static pthread_key_t key;
static pthread_once_t once = PTHREAD_ONCE_INIT;
static void destructor(void *ptr)
{
free(ptr);
}
static void getkey_once(void)
{
pthread_key_create(&key, destructor);
}
typedef struct DATA_THR{
int index;
};
main()
{
int listenfd, connectfd; /* socket descriptors */
pthread_t thread;
ARG *arg;
struct sockaddr_in server; /* server's address information */
struct sockaddr_in client; /* client's address information */
socklen_t sin_size;
/* Create TCP socket */
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() error\n");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while(1)
{
/* accept connection */
if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {
perror("accept() error\n");
exit(1);
}
/* create thread */
arg = new ARG;
arg->connfd = connectfd;
memcpy((void *)&arg->client, &client, sizeof(client));
if (pthread_create(&thread, NULL, start_routine, (void*)arg)) {
/* handle exception */
perror("Pthread_create() error");
exit(1);
}
}
close(listenfd); /* close listenfd */
}
void process_cli(int connectfd, sockaddr_in client)
{
int num;
char cli_data[5000];
char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];
printf("You got a connection from %s. ",inet_ntoa(client.sin_addr) );
/* Get client's name from client */
num = recv(connectfd, cli_name, MAXDATASIZE,0);
if (num == 0) {
close(connectfd);
printf("Client disconnected.\n");
return;
}
cli_name[num - 1] = '\0';
printf("Client's name is %s.\n",cli_name);
while (num = recv(connectfd, recvbuf, MAXDATASIZE,0)) {
recvbuf[num] = '\0';
printf("Received client( %s ) message: %s",cli_name, recvbuf);
/* save user's data */
savedata_r(recvbuf,num,cli_data);
/* reverse usr's data */
for (int i = 0; i < num - 1; i++) {
sendbuf[i] = recvbuf[num - i -2];
}
sendbuf[num - 1] = '\0';
send(connectfd,sendbuf,strlen(sendbuf),0); /* send to the client welcome message */
}
close(connectfd); /* close connectfd */
printf("Client( %s ) closed connection. User's data: %s\n",cli_name,cli_data);
}
void* start_routine(void* arg)
{
ARG *info;
info = (ARG *)arg;
/* 处理客户请求 */
process_cli(info->connfd, info->client);
delete info;
pthread_exit(NULL);
}
void savedata_r(char* recvbuf, int len, char* cli_data)
{
DATA_THR* data;
/* thread_special data */
pthread_once(&once, getkey_once);
if ( (data = (DATA_THR *)pthread_getspecific(key)) == NULL) {
data = (DATA_THR *)calloc(1, sizeof(DATA_THR));
pthread_setspecific(key, data);
data->index = 0; }
for (int i = 0; i < len - 1; i++) {
cli_data[data->index++] = recvbuf[i];
}
cli_data[data->index] = '\0';
}
6.3 函数参变量取代静态变量
通过使用函数参变量取代静态变量来取代静态变量,另外该函数的调用函数必须为这些变量分配相应的空间并初始化。
typedef struct DATA_THR{
int index;
};
Void fun()
{
DATA_THR data;// 该函数的调用函数必须为这些变量分配相应的空间并初始化
data.index = 0;
savedata(recvbuf,num,cli_data, &data);// 使用函数参变量取代静态变量来取代静态变量
}