Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2431249
  • 博文数量: 298
  • 博客积分: 7876
  • 博客等级: 准将
  • 技术积分: 5500
  • 用 户 组: 普通用户
  • 注册时间: 2011-02-23 13:39
文章存档

2013年(2)

2012年(142)

2011年(154)

分类: LINUX

2011-04-04 14:58:23

5)多线程服务器

 

注:所以文章红色字体代表需要特别注意和有问题还未解决的地方,蓝色字体表示需要注意的地方

 

1.     本文所介绍的程序平台

开发板:arm9-mini2440

虚拟机为:Red Hat Enterprise Linux 5

开发板上系统内核版本:linux-2.6.32.2

 

 

2.线程基础

同一进程可以包括多个线程,这些线程共享相同的内存空间,不同的线程可以存取相同的全局变量、相同的堆数据和文件描述符等,进程间通信需要专门的机制IPC,而线程不需要,所以线程的创建比进程的创建快10~100倍左右,另一方面如果一个现场崩溃,它将影响到同一进程的其他线程。

同一进程中的线程共享如下内容:

全局变量

堆数据

打开的描述符

当前工作目录

用户及用户组ID

但是每个线程具有独立的:

线程ID

堆栈

errno变量

优先级

 

3.线程函数

3.1 pthread_create()函数

       #include

int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)

thread:指向线程的ID的指针。

attr:指向线程属性的指针,包括线程优先级,堆栈大小等,如果属性为NULL则使用系统默认的属性,定义的属性在创建后修改attr不能改变线程的属性。

start_routine:指向线程的执行函数,该函数必须是一个静态函数。

arg:向执行函数传递参数的通用指针。

返回值:成功返回0,否则返回

ENOMEM:没有足够内存产生线程

EINVAL:无限的attr

EPERM:调用者无权设置调度参数或者调度策略

3.2 pthread_join()函数

       #include

int pthread_join(pthread_t thread, void **value_ptr)

作用:该函数挂起当前线程直到所等待的线程结束,与waitpid()功能类似。

Thread所等待线程的ID,该线程必须是当前进程的成员,并且不是分离线程和守护线程,几个线程不能同时等待同一线程结束。

value_ptr:如果该值非空,则指向终止线程的退出状态。

返回值:如果成功调用则返回0,否则返回

EINVAL:无限的线程ID

ESRCH:无法找到相应ID的线程

3.3 pthread_detach()函数

#include

int pthread_ detach (pthread_t thread)

       thread:被设置为“可分离的”线程ID

       返回值:成功返回0,返回错误码

3.4 pthread_exit()函数

#include

int pthread_ exit (void *value_ptr)

       value_ptr指向线程的退出状态,注意不要指向一个局部变量,因为当前进程终止时候所有的局部变量将被撤销。

       返回值:成功返回0,返回错误码

3.5 pthread_self()函数

#include

int pthread_ self (void)

       返回值:返回当前线程的ID

3.6 pthread_cancle()函数

#include

int pthread_ cancle (pthread_t thread)

thread:被终止的线程ID

返回值:成功返回0,返回错误码

4. 给线程传递参数

4.1 pthread_create创建时传递的void *arg参数,如果传递的为多个数据,可以讲多个数据封装在一个结构体里面。

注意:在处理多个客户时,变量arg是所有线程共用的,注意保护

4.2通过指针参数传递

c语言里面传递给函数的参数是被拷贝到函数的执行堆栈中的,可以利用这个特点使新线程得到参数的拷贝,主线程将要传递的数据类型转换成通用指针类型,然后传递给新线程,新线程再将接收的数据参数转换成原数据类型。

注意:arg类型必须能够正确的转换为通用指针类型,arg的字节长度必须小于等于通用指针的长度

4.3通过分配arg的空间传递参数(常用

主线程首先为每个线程分配存储arg的空间,再将arg传递给新线程

注意:在新线程里面注意把分配的空间释放掉

 

5. 多线程服务器设计

5.1以下是多线程服务器设计实例

功能:

服务器等候客户连接请求,一旦连接成功显示客户地址,接收该客户的名字并显示,然后接收来自用户的信息,每收到一个字符串则显示,并将字符串反转,再将反转的字符串发回客户端。

 

客户端首先与服务器相连,接着发送客户端名字,然后发送客户信息,接收到服务器信息并显示,之后等待用户输入Crtl+D,就关闭连接并退出。

 

 

//server.c

#include           /* These are the usual header files */

    #include           /* for bzero() */

    #include          /* for close() */

    #include

    #include

    #include

    #include

    #include

    #include

 

    #define PORT 1234   /* Port that will be opened */

    #define BACKLOG 5   /* Number of allowed connections */

    #define MAXDATASIZE 1000 

 

    void process_cli(int connectfd, sockaddr_in client);

    /* function to be executed by the new thread */

    void* start_routine(void* arg);

    struct  ARG  {

       int connfd;

       sockaddr_in client; 

    };

 

    main()

    {

    int listenfd, connectfd; /* socket descriptors */

    pthread_t  thread;

    ARG *arg;

    struct sockaddr_in server; /* server's address information */

    struct sockaddr_in client; /* client's address information */

    socklen_t sin_size;

 

    /* Create TCP socket */

    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {

       /* handle exception */

       perror("Creating socket failed.");

       exit(1);

    }

 

    int opt = SO_REUSEADDR;

    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

 

    bzero(&server,sizeof(server));

    server.sin_family=AF_INET;

    server.sin_port=htons(PORT);

    server.sin_addr.s_addr = htonl (INADDR_ANY);

    if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {

       /* handle exception */

       perror("Bind error.");

       exit(1);

       }   

 

    if(listen(listenfd,BACKLOG) == -1){  /* calls listen() */

       perror("listen() error\n");

       exit(1);

       }

 

    sin_size=sizeof(struct sockaddr_in);

    while(1)

    {

       /* Accept connection */

       if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {

          perror("accept() error\n");

          exit(1);

          }

       /*  Create thread*/

 

       arg = new ARG;

       arg->connfd = connectfd;

       memcpy((void *)&arg->client, &client, sizeof(client));

 

       if (pthread_create(&thread, NULL, start_routine, (void*)arg)) {

          /* handle exception */

          perror("Pthread_create() error");

          exit(1);

          }

    }

    close(listenfd);   /* close listenfd */        

    }

 

    void process_cli(int connectfd, sockaddr_in client)

    {

    int num;

    char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];

 

    printf("You got a connection from %s.  ",inet_ntoa(client.sin_addr) );

    /* Get client's name from client */

    num = recv(connectfd, cli_name, MAXDATASIZE,0);

    if (num == 0) {

       close(connectfd);

       printf("Client disconnected.\n");

       return;

       }

    cli_name[num - 1] = '\0';

    printf("Client's name is %s.\n",cli_name);

 

    while (num = recv(connectfd, recvbuf, MAXDATASIZE,0)) {

       recvbuf[num] = '\0';

       printf("Received client( %s ) message: %s",cli_name, recvbuf);

       for (int i = 0; i < num - 1; i++) {

          sendbuf[i] = recvbuf[num - i -2];

          }

       sendbuf[num - 1] = '\0';

       send(connectfd,sendbuf,strlen(sendbuf),0);

       }

    close(connectfd); /*  close connectfd */

    }

 

    void* start_routine(void * arg)

    {

    ARG *info;

    info = (ARG *)arg;

 

    /* handle clients requirement */

    process_cli(info->connfd, info->client);

 

    delete (ARG *)arg;

    pthread_exit(NULL);

    }

 

 

 

 

// client.c

    #include

    #include

    #include

    #include

    #include

    #include

    #include         /* netbd.h is needed for struct hostent =) */

    #include

 

    #define PORT 1234   /* Open Port on Remote Host */

    #define MAXDATASIZE 100   /* Max number of bytes of data */

    void process(FILE *fp, int sockfd);

    char* getMessage(char* sendline,int len, FILE* fp);

 

    int main(int argc, char *argv[])

    {

    int fd;   /* files descriptors */

    struct hostent *he;         /* structure that will get information about remote host */

    struct sockaddr_in server;  /* server's address information */

 

    if (argc !=2) {      

       printf("Usage: %s \n",argv[0]);

       exit(1);

       }

 

    if ((he=gethostbyname(argv[1]))==NULL){ /* calls gethostbyname() */

       printf("gethostbyname() error\n");

       exit(1);

       }

 

    if ((fd=socket(AF_INET, SOCK_STREAM, 0))==-1){  /* calls socket() */

       printf("socket() error\n");

       exit(1);

       }

 

    bzero(&server,sizeof(server));

    server.sin_family = AF_INET;

    server.sin_port = htons(PORT); /* htons() is needed again */

    server.sin_addr = *((struct in_addr *)he->h_addr); 

 

    if(connect(fd, (struct sockaddr *)&server,sizeof(struct sockaddr))==-1){ /* calls connect() */

       printf("connect() error\n");

       exit(1);

       }

 

    process(stdin,fd);

 

    close(fd);   /* close fd */ 

    }

 

    void process(FILE *fp, int sockfd)

    {

    char    sendline[MAXDATASIZE], recvline[MAXDATASIZE];

    int numbytes;

 

    printf("Connected to server. \n");

    /* send name to server */

    printf("Input name : ");

    if ( fgets(sendline, MAXDATASIZE, fp) == NULL) {

       printf("\nExit.\n");

       return;

       }

    send(sockfd, sendline, strlen(sendline),0);

 

    /* send message to server */ 

    while (getMessage(sendline, MAXDATASIZE, fp) != NULL) {

       send(sockfd, sendline, strlen(sendline),0);

 

       if ((numbytes = recv(sockfd, recvline, MAXDATASIZE,0)) == 0) {

          printf("Server terminated.\n");

          return;

          }

 

       recvline[numbytes]='\0';

       printf("Server Message: %s\n",recvline); /* it prints server's welcome message  */

 

       }

    printf("\nExit.\n");

    }

 

    char* getMessage(char* sendline,int len, FILE* fp)

    {

    printf("Input string to server:");

    return(fgets(sendline, MAXDATASIZE, fp));

    }

 

6. 线程安全

6.1线程安全基础

 

注意:由于同一进程中的所有线程共享相同的内存空间,如果多个线程修改相同的内存区域就会造成意想不到的后果。

unix系统中,由于静态变量的使用,造成许多API函数都不是线程安全的,对于大都非线程安全函数都要一个线程安全版本,即MT-safe版本。新的安全函数一般是旧的函数加上_r后缀,例如Solaris系统提供的函数:

asctime_r(3C) ctermid_r(3S) ctime_r(3C)
fgetgrent_r(3C) fgetpwent_r(3C) fgetspent_r(3C)
Gamma_r(3M) getgrgid_r(3C) getgrnam_r(3C)
getlogin_r(3C) getpwnam_r(3C) getpwuid_r(3C)
getgrent_r(3C) gethostbyaddr_r(3N) gethostbyname_r(3N)
gethostent_r(3N) getnetbyaddr_r(3N) getnetbyname_r(3N)
getnetent_r(3N) Getprotobyname_r(3N) getprotobynumber_r(3N)
getprotoent_r(3N) getpwent_r(3C) getrpcbyname_r(3N)
getrpcbynumber_r(3N) getrpcent_r(3N) getservbyname_r(3N)
getservbyport_r(3N) getservent_r(3N) getspent_r(3C)
getspnam_r(3C) gmtime_r(3C) lgamma_r(3M)
localtime_(3C)r nis_sperror_r(3N) rand_r(3C)
readdir_r(3C) strtok_r(3C) tmpnam_r(3C)
ttyname_r(3C)

静态变量:由于静态变量只创建一次,在第一次创建之后,如果其他的线程又使用同一段代码,那么此静态变量继续沿用第一次创建的,所有不安全。

局部变量:在线程中局部变量是线程安全的,它的生存周期只限于创建它的作用域。

http://blog.csdn.net/yangjian15/archive/2009/11/07/4783016.aspx

 

以下代码就不是线程安全的:

void savedata(char* recvbuf, int len, char* cli_data)

    {

    static int index = 0;

    for (int i = 0; i < len - 1; i++) {

       cli_data[index++] = recvbuf[i];

       } 

    cli_data[index] = '\0';      

 }

 

                  

6.2线程专用数据(TSD)

6.2.1 pthread_key_create()函数

#include

int pthread_key_create (pthread_key_t *key, void (*destructor)(void *value))

key:指向创建的关键字

destructor:一个可选的析构函数

返回值:成功返回0,返回错误码

       EAGAIN:关键字的名字空间用尽

       ENOMEM:内存不足

6.2.2 pthread_setspecific()函数:为TSD关键字绑定一个与本线程相关的值

#include

int pthread_ setspecific (pthread_key_t key, const void *value)

keyTSD关键字

value:本线程相关的值

返回值:成功返回0,返回错误码

       EINVAL:关键字非法

       ENOMEM:内存不足

6.2.3 pthread_getspecific()函数:为TSD关键字绑定一个与本线程相关的值

#include

Void * pthread_ getspecific (pthread_key_t key,)

keyTSD关键字

返回值:成功返回与调用线程相关的关键字所绑定的值,否则返回NULL

6.2.4 pthread_once()函数:

#include

pthread_once_t once_control = PTHREAD_ONCE_INIT

int pthread_ once (pthread_once_t *once_control,

void (* init_routine)(void))

once_control:在线程专用数据使用中,该值被设置为PTHREAD_ONCE_INIT

init_routine:当once_control 被设置为PTHREAD_ONCE_INIT时,该函数在同一进程中只被调用一次

返回值:成功返回0,返回错误码

       EINVAL:关键字非法

 

注意; 在进程内部有唯一的关键字KEY,可以调用pthread_key_create来创建,然后每个线程可以将自己的数据绑定到这个关键字上,绑定的数据就是TSD.所以pthread_key_create只进行一次,而当前线程设置自己的数据可以先通过pthread_ getspecific来知道自己设置没有如果返回NULL标志着还没设置。

 

程序清单;

    #include           /* These are the usual header files */

    #include           /* for bzero() */

    #include          /* for close() */

    #include

    #include

    #include

    #include

    #include

    #include

 

    #define PORT 1234   /* Port that will be opened */

    #define BACKLOG 5   /* Number of allowed connections */

    #define MAXDATASIZE 1000 

 

    void process_cli(int connectfd, sockaddr_in client);

    void savedata_r(char* recvbuf, int len, char* cli_data);

    /* function to be executed by the new thread */

    void* start_routine(void* arg);

    struct  ARG  {

       int connfd;

       sockaddr_in client; 

       };

 

    /* Thread saft  */

    static pthread_key_t    key;

    static pthread_once_t   once = PTHREAD_ONCE_INIT;

    static void destructor(void *ptr)

    {

       free(ptr);

    }

    static void getkey_once(void)

    {

       pthread_key_create(&key, destructor);

    }

    typedef struct DATA_THR{

       int  index;                    

    };

 

    main()

    {

    int listenfd, connectfd; /* socket descriptors */

    pthread_t  thread;

    ARG *arg;

    struct sockaddr_in server; /* server's address information */

    struct sockaddr_in client; /* client's address information */

    socklen_t sin_size;

 

    /* Create TCP socket  */

    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {

       /* handle exception */

       perror("Creating socket failed.");

       exit(1);

       }

 

    int opt = SO_REUSEADDR;

    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

 

    bzero(&server,sizeof(server));

    server.sin_family=AF_INET;

    server.sin_port=htons(PORT);

    server.sin_addr.s_addr = htonl (INADDR_ANY);

    if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {

       /* handle exception */

       perror("Bind error.");

       exit(1);

       }   

 

    if(listen(listenfd,BACKLOG) == -1){  /* calls listen() */

       perror("listen() error\n");

       exit(1);

       }

 

    sin_size=sizeof(struct sockaddr_in);

    while(1)

    {

       /* accept connection */

       if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {

          perror("accept() error\n");

          exit(1);

          }

       /* create thread */

       arg = new ARG;

       arg->connfd = connectfd;

       memcpy((void *)&arg->client, &client, sizeof(client));

       if (pthread_create(&thread, NULL, start_routine, (void*)arg)) {

          /* handle exception */

          perror("Pthread_create() error");

          exit(1);

          }

    }

    close(listenfd);   /* close listenfd */        

    }

 

    void process_cli(int connectfd, sockaddr_in client)

    {

    int num;

    char cli_data[5000];

    char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];

 

    printf("You got a connection from %s.  ",inet_ntoa(client.sin_addr) );  

    /* Get client's name from client */

    num = recv(connectfd, cli_name, MAXDATASIZE,0);

    if (num == 0) {

       close(connectfd);

       printf("Client disconnected.\n");

       return;

       }

    cli_name[num - 1] = '\0';

    printf("Client's name is %s.\n",cli_name);

 

    while (num = recv(connectfd, recvbuf, MAXDATASIZE,0)) {

       recvbuf[num] = '\0';

       printf("Received client( %s ) message: %s",cli_name, recvbuf);

 

       /* save user's data */

       savedata_r(recvbuf,num,cli_data);

 

       /* reverse usr's data */

       for (int i = 0; i < num - 1; i++) {

          sendbuf[i] = recvbuf[num - i -2];

          }

       sendbuf[num - 1] = '\0';

 

       send(connectfd,sendbuf,strlen(sendbuf),0); /* send to the client welcome message */

    }

 

    close(connectfd); /*  close connectfd */

    printf("Client( %s ) closed connection. User's data: %s\n",cli_name,cli_data);

    }

 

    void* start_routine(void* arg)

    {

    ARG *info;

    info = (ARG *)arg;

 

    /* 处理客户请求 */

    process_cli(info->connfd, info->client);

 

    delete info;

    pthread_exit(NULL);

    }

    void savedata_r(char* recvbuf, int len, char* cli_data)

    {

    DATA_THR* data;

 

    /* thread_special data */

    pthread_once(&once, getkey_once);

    if ( (data = (DATA_THR *)pthread_getspecific(key)) == NULL) {

       data = (DATA_THR *)calloc(1, sizeof(DATA_THR));     

       pthread_setspecific(key, data);

       data->index = 0; }

 

    for (int i = 0; i < len - 1; i++) {

       cli_data[data->index++] = recvbuf[i];

       } 

    cli_data[data->index] = '\0';      

    }

 

 

6.3 函数参变量取代静态变量

通过使用函数参变量取代静态变量来取代静态变量,另外该函数的调用函数必须为这些变量分配相应的空间并初始化。

typedef struct DATA_THR{

       int  index;                    

       };

Void fun()

{

DATA_THR data;// 该函数的调用函数必须为这些变量分配相应的空间并初始化

data.index = 0;

savedata(recvbuf,num,cli_data, &data);// 使用函数参变量取代静态变量来取代静态变量

 

}

阅读(2472) | 评论(0) | 转发(2) |
给主人留下些什么吧!~~