摘要:本文介绍一种新型的基于消息队列的重复型通信软件的设计方法,不同于并发型服务器和一般的重复型服务器通信软件,这种新的软件具有生成的子进程数少的优点,并且容易对客户机与服务器的连接进行管理,适用于客户机数量较多和随机数据通信的情况,能够有效地提高服务器的运行效率。
关键词:网络 重复型服务器通信软件 套接字 连接 共享 消息队列
并发服务器与重复服务器的区别
一般TCP/IP服务器通信软件都是并发型的,即是由一个守护进程负责监听客户机的连接请求,然后再由守护进程生成一个或多个子进程与客户机具体建立连接以完成通信,其缺点是随着连接的客户机数量的增多,生成的通信子进程数量会越来越多,在客户机数量较多的场合势必影响服务器的运行效率。一般的重复服务器指的是服务器在接收客户机的连接请求后即与之建立连接,然后要在处理完与客户机的通信任务后才能再去接收另一客户机的请求连接,其优点是不必生成通信子进程,缺点是客户机在每次通信之前都要与服务器建立连接,开销过大,不能用于随机的数据通信和繁忙的业务处理。
本文提出的新型的重复型服务器不同于一般的重复服务器,它摒弃了上述两类服务器的缺点综合其优点,该服务器通信软件具有一般重复服务器的特征但又能处理客户机的随机访问,在客户机数量多且业务繁忙的应用场合将发挥其优势。重复型服务器通信软件只用三个进程就可完成与所有客户机建立连接,并始终保持这些连接。
重复型服务器通信软件与客户机建立连接的方法
基本思路
当第一台客户机向服务器请求连接时,服务器的守护进程与之建立初始连接(L0),客户机利用L0向服务器发送两个端口号,守护进程将客户机的IP地址和端口号登记在共享内存的记录中,然后关闭L0。由守护进程生成的两个通信子进程从共享内存中获得客户机IP地址及端口号后,分别向客户机请求连接,建立一个从客户机读的连接(L1)和一个往客户机写的连接(L2),并将两个连接的套接字的句柄记录在共享内存中。当另一台客户机请求连接时,守护进程不再生成通信子进程,只是将客户机IP地址和端口号同样登记在共享内存中。通信子进程在一个大循环中先查询共享内存中是否有新的记录,如果有则与这一台客户机建立连接,然后轮询所有已建立的连接的读套接字,查看是否有数据可读,有则读取数据,同时标明该数据是从共享内存中的哪条记录上的读套接字中获得的,再由另一个通信子进程根据这个记录的编号从共享内存中获得对应的写套接字,最后将结果数据往该套接字写往客户机。
建立连接
⑴ 服务器通信软件的初始进程首先建立公用端口上的套接字,并在该套接字上建立监听队列,同时生成一个守护进程(Daemon)tcp_s,然后初始进程就退出运行。守护进程在函数accept处堵塞住直到有客户机的连接请求,一有连接请求即调用server函数处理,然后继续循环等待另一台客户机的请求。因为TCP/IP在连接被拆除后为了避免出现重复连接的现象,一般是将连接放在过时连接表中,连接在拆除后若要避免处于TIME_WAIT状态(过时连接),可调用setsockopt设置套接字的linger延时标志,同时将延时时间设置为0。服务器在/etc/services文件中要登记一个全局公认的公用端口号:tcp_server 2000/tcp。
struct servent *sp;
struct sockaddr_in peeraddr_in,myaddr_in;
linkf=0;
sp=getservbyname("tcp_server","tcp");
ls=socket(AF_INET,SOCK_STREAM,0); /* 创建监听套接字 */
myaddr_in.sin_addr.s_addr=INADDR_ANY;
myaddr_in.sin_port=sp->s_port; /* 公用端口号 */
bind(ls,&myaddr_in,sizeof(struct sockaddr_in));
listen(ls,5);
qid3=msgget(MSGKEY3,0x1ff); /* 获得消息队列的标志号 */
qid4=msgget(MSGKEY4,0x1ff);
signal(SIGCLD,SIG_IGN); /* 避免子进程在退出后变为僵死进程 */
addrlen=sizeof(struct sockaddr_in);
lingerlen=sizeof(struct linger);
linger.l_onoff=1;
linger.l_linger=0;
setpgrp();
switch(fork()){ /* 生成Daemon */
case -1:exit(1);
case 0: /* Daemon */
for(;;){
s=accept(ls,&peeraddr_in,&addrlen);
setsockopt(s,SOL_SOCKET,SO_LINGER,&linger,lingerlen);
server();
close(s);
}
default:
fprintf(stderr,"初始进程退出,由守护进程监听客户机的连接请求.\n");
}
|
⑵ 客户机以这样的形式运行通信程序tcp_:tcp_c rhostname,rhostname为客户机所要连接的服务器主机名。客户机上的/etc/services文件中也要登记:tcp_server 2000/tcp,公用端口号2000要与服务器一样。
int qid1,qid2,s_c1,s_c2,cport1,cport2;
struct servent *sp;
struct hostent *hp;
memset((char *)&myaddr_in,0,sizeof(struct sockaddr_in));
memset((char *)&peeraddr_in,0,sizeof(struct sockaddr_in));
addrlen=sizeof(struct sockaddr_in);
sp=getservbyname("tcp_server","tcp");
hp=gethostbyname(argv[1]); /* 从/etc/hosts中获取服务器的IP地址 */
qid1=msgget(MSGKEY1,0x1ff);
qid2=msgget(MSGKEY2,0x1ff);
cport1=6000;
s=rresvport(&cport1);
peeraddr_in.sin_family=hp->h_addrtype;
bcopy(hp->h_addr_list[0],(caddr_t)&peeraddr_in.sin_addr,hp->h_length);
peeraddr_in.sin_port=sp->s_port;
connect(s,(struct sockaddr *)&peeraddr_in,sizeof(peeraddr_in));
cport1--;
s_c1=rresvport(&cport1);
cport2=cport1;
s_c2=rresvport(&cport2);
sprintf(cportstr,"%dx%d",cport1,cport2);
write(s,cportstr,strlen(cportstr)+1);
close(s);
|
先给变量cport1置一个整数后调用rresvport函数,该函数先检查端口号cport1是否已被占用,如果已被占用就减一再试,直到找到一个未用的端口号,然后生成一个套接字,将该套接字与端口号相联形成客户机端的半相关,接下调用connect函数向服务器发出连接请求。客户机在发出连接请求之前,已用函数gethostbyname和getservbyname获得了服务器的IP地址及其公用端口号,这样就形成了一个完整的相关,可建立起与服务器的初始连接。接下来再创建两个套接字s_c1和s_c2,利用初始连接将客户机的两个套接字的端口号以字符串的形式发送给服务器,这时初始连接的任务已经完成就可将其关闭。以上就完成了与服务器的初始连接,接下来客户机等待服务器的两次连接请求。
⑶ tcp_s的监听队列在收到客户机发来的连接请求后,由server函数读出客户机发送来的两个端口号,并在第一次调用时生成两个通信子进程tcp_s1和tcp_s2,以后就不再生成,这是与并发服务器最大的不同。tcp_s进程将客户机的两个端口号和IP 地址以记录的形式登记在共享内存最后一条记录中,子进程通过共享内存获得这两个端口号,然后再分别与客户机建立连接。tcp_s继续处于监听状态,以便响应其他客户机的连接请求。两个子进程都应该关闭从父进程继承来的但又没有使用的套接字s。
server(){
int f;char c;
cport1=cport2=f=0;
for(;;){
read(s,&c,1);
if(c==0) break;
if(c=='x'){
f=1;continue;
}
if(f) cport2=(cport2*10)+(c-'0');
else cport1=(cport1*10)+(c-'0');
}
/* 在共享内存中登记客户机端口号和IP地址 */
shm_login(cport1,cport2,peeraddr_in.sin_addr.s_addr);
if(linkf==0){ /* 只生成两个子进程 */
if(fork()==0){ /* 子进程tcp_s2 */
close(s);Server_Send();
}else
if(fork()==0){ /* 子进程tcp_s1 */
close(s);Server_Receive();
}
}
linkf=1;
}
|
共享内存的结构如下,通信子进程tcp_s1从s_socket1读,tcp_s2往对应的s_socket2写。
struct s_linkinfo{
int id; /* 连接的标志号,从1开始顺序编号 */
int s_socket1; /* 服务器的读套接字 */
int linkf1; /* 与客户机的cport1连接标志,0:未建立连接,1:已经连接 */
int cport1; /* 客户机的第一个端口号 */
int s_socket2; /* 服务器的写套接字 */
int linkf2; /* 与客户机的cport2连接标志 */
int cport2; /* 客户机的第二个端口号 */
u_long client_addr; /* 客户机IP地址 */
char flag; /* 共享内存占用标志,'i':已占用,'o':未占用 */
};
|
⑷ tcp_c用listen(s_c1,5)在套接字s_c1上建立客户机的第一个监听队列,等待服务器的连接请求。在与服务器建立第一个连接后,再用listen(s_c2,5)建立第二个监听队列,与服务器建立第二个连接。
listen(s_c1,5);
s_w=accept(s_c1,&peeraddr_in,&addrlen);
close(s_c1); /*只允许接收一次连接请求*/
linger.l_onoff=1;linger.l_linger=0;
setsockopt(s_w,SOL_SOCKET,SO_LINGER,&linger,sizeof(struct linger));
listen(s_c2,5);
s_r=accept(s_c2,&peeraddr_in,&addrlen);
close(s_c2);
setsockopt(s_r,SOL_SOCKET,SO_LINGER,&linger,sizeof(struct linger));
|
⑸ 进程tcp_s1调用函数Server_Receive在一个循环中不断查询是否又有新的客户机登记在共享内存中,方法是判断共享内存中最后一条记录的linkf1标志是否为0,如果为0就调函数connect_to_client与客户机建立第一个连接,然后轮询所有的读套接字,有数据则读,没有数据则读下一个读套接字。
Server_Receive(){
int s1,len,i,linkn,linkf1,n;
struct msg_buf *buf,mbuf;
buf=&mbuf;
for(;;){
linkn=shm_info(0,GETLINKN);
linkf1=shm_info(linkn,GETLINKF1);
if(linkf1==0){
if((i=connect_to_client(linkn,1))<0){
shm_logout(linkn);continue;
}
}
for(n=1;n<=linkn;n++){
s1=shm_info(n,GETS1);
i=read(s1,buf,MSGSIZE);
if(i==0){
fprintf(stderr,"A client exit!\n");
shutdown(s1,1);close(s1);
shm_logout(n);
linkn--;continue;
}
if(i==-1) continue;
buf->mtype=MSGTYPE;buf->sid=n;
len=strlen(buf->mdata);
fprintf(stderr,"mdata=%s\n",buf->mdata);
i=msgsnd(qid3,buf,len+BUFCTLSIZE+1,0);
}
}
}
|
由于已将读套接字的读取标志设为O_NDELAY,所以没有数据可读时read函数就返回-1不会堵塞住。这样我们才能接收到客户机随机的数据发送同时也才能及时响应新的客户机的连接请求,这是重复服务器得以实现的关键所在。如果read函数返回0则表示客户机通信程序已退出或者别的原因,比如客户机关机或网络通信故障等,此时就要从共享内存中清除相应客户机的记录。在建立连接时如果出现上述故障也要从共享内存中清除相应客户机的记录。在有数据可读时就将sid标志设置为n,表示数据是从第n台客户机读取的,这样子进程tcp_s2才可根据消息的sid标志往第n台客户机写数据。
⑹ 进程tcp_s2调用函数Server_Send,在一个循环中不断查询是否又有新的客户机连接登记在共享内存中,方法是判断共享内存中最后一条记录的linkf2标志是否为0,如果为0就调用函数connect_to_client与客户机建立第二个连接,然后再从消息队列中读数据。因为只有一个tcp_s2进程在读消息队列,所以就不必对消息进行区别,有数据则读。再按照消息的sid标志从共享内存中查出写套接字,然后将数据往该套接字写。由于该写套接字是在进程tcp_s2内创建的,所以只要简单地使用套接字的句柄即可访问该套接字。函数msgrcv要设置IPC_NOWAIT标志以免在没有数据时堵塞住,这样才能继续执行下面的程序以便及时地与下一台客户机建立连接,这也是一个关键的地方。tcp_s2调用函数Server_Send用于数据发送,tcp_s1则调用函数Server_Recvice用于数据接收。
Server_Send(){
int s2,linkn,linkf2,i;
struct msg_buf *buf,mbuf;
buf=&mbuf;
for(;;){
linkn=shm_info(0,GETLINKN);
linkf2=shm_info(linkn,GETLINKF2);
if(linkf2==0){
if((i=connect_to_client(linkn,2))<0){
shm_logout(linkn);continue;
}
}
i=msgrcv(qid4,buf,MSGSIZE,MSGTYPE,0x1ff|IPC_NOWAIT);
if(i==-1) continue;
s2=shm_info(buf->sid,GETS2);
if(write(s2,buf,i+1)!=i+1){
perror("write");close(s2);
}
}
}
|
函数connect_to_client(n,type)表示服务器与第n台客户机建立第type次连接。该函数由两个子进程同时调用,分别从共享内存中查出客户机的IP地址和端口号后与客户机建立连接,建立的连接分别处于各个子进程自己的数据空间中,彼此并不相通,所以又要用到共享内存,将连接的套接字句柄登记在共享内存中,使得与同一台客户机建立连接的两个套接字形成一一对应的关系。
这样tcp_s2才可根据数据读入的套接字去查询出对应的写套接字,才能正确地将处理结果发送给对应的客户机。tcp_s1以type=1调用该函数,使用共享内存中第n条记录的cport1和客户机IP地址与客户机建立第一个连接,同时将这一连接服务器方的套接字(读套接字)登记在共享内存第n条记录的s_socket1中,同时将连接标志linkf1置1。
tcp_s2以type=2调用该函数,使用共享内存中第n条记录的cport2和客户机IP地址与客户机建立第二条连接,同样也要将这一连接服务器方的套接字(写套接字)登记在共享内存第n条记录的s_socket2中,将连接标志linkf2置1。因为该函数由两个子进程同时调用,为了保持进程间同步,当type=2时必需等到第n条记录的linkf1为1时才能继续执行,即必须先建立第一个连接才能再建立第二个连接,这是由客户机通信程序决定的,因为客户机通信程序是先监听并建立起第一个连接后再监听并建立第二个连接。子进程tcp_s1和tcp_s2通过共享内存实现进程间通信,在实际应用中总是使用共享内存的最后一条记录。
②:(5991,5990,168.1.1.71) ┌─────┐①:(5991,5990) 168.1.1.21
┌─────────────┤ 守护进程 ├←─────────┐┌─────┐
│ │ tcp_s │ 初始连接L0 ││ Client 1 │
│ 共享内存 └─────┘ │├──┬──┤
│ id s1 linkf1 cport1 s2 linkf2 cport2 IP_Address flag ││5999│5998│
│ ┌─┬──┬──┬──┬──┬──┬──┬─────┬─┐│└──┴──┘
│ │1 │ 12 │ 1 │5999│ 13 │ 1 │5998│168.1.1.21│i ││ 168.1.1.22
│ ├─┼──┼──┼──┼──┼──┼──┼─────┼─┤│┌─────┐
│ │2 │ 14 │ 1 │5995│ 17 │ 1 │5994│168.1.1.22│i │││ Clinet 2 │
│ ├─┼──┼──┼──┼──┼──┼──┼─────┼─┤│├──┬──┤
└→┤3 │0/22│0/1 │5991│0/23│0/1 │5990│168.1.1.71│i │││5995│5994│
└─┴──┼──┴┬─┴──┼──┴┬─┴─────┴─┘│└──┴──┘
⑤:(22,1)↑ │ ↑ ↓⑥:(5990,168.1.1.71)│ 168.1.1.71
│ │ │ └─────┐ │┌─────┐
│ │ │⑧:(23,1) ┌──┴┬─┐ └┤ Client 3 │
│ │ └──────┤ │13│ ├──┬──┤
│ ↓③:(5991,168.1.1.71) │通信 ├─┤ │5991│5990│
│┌──┴┬─┐ │子进程│17│ └┬─┴─┬┘
└┤ │12│ │tcp_s2├─┤ │ L2↑⑦
│通信 ├─┤ │ │23├───┼───┘
│子进程│14│ └───┴─┘ │
│tcp_s1├─┤L1 (读套接字22) (写套接字23) │
│ │22├←─────────────────┘
└───┴─┘④
图1 服务器和客户机建立连接的过程
这里必须置套接字的读取标志位O_NDELAY,这样在读数据时如果没有数据可读read函数就不会堵塞住,这是重复型服务器能够实现的关键。因为UNIX系统将套接字与普通文件等同处理,所以就能够使用设置文件标志的函数fcntl来处理套接字。
int connect_to_client(n,type){
u_long client_addr; /* type=1,2 */
int s2,cport,sport,i;
if(type==2){
for(;;) if(shm_info(n,GETLINKF1)==1) break;
}
sport=6000-1;s2=rresvport(&sport);
cport=shm_info(n,GETCPORT1+type-1);
client_addr=shm_info(n,GETCADDR);
peeraddr_in.sin_port=htons((short)cport);
peeraddr_in.sin_addr.s_addr=client_addr;
connect(s2,(struct sockaddr *)&peeraddr_in,sizeof(peeraddr_in));
flags=fcntl(s2,F_GETFL,0);
fcntl(s2,F_SETFL,flags|O_NDELAY);
if(type==1) i=shm_update(n,s2,0,1,0);
if(type==2) i=shm_update(n,0,s2,0,1);
return(i);
}
|
⑺ tcp_c在接收到服务器的两个连接后,生成子进程tcp_c1调用函数Client_Receive用于接收数据,tcp_c则调用函数Client_Send用于发送数据。如果函数Client_Receive从循环中退出,就说明服务器通信软件已退出,于是子进程在退出之前要先杀掉父进程。
cpid=getpid(); /* 父进程的进程号 */
if(fork()==0){ /* tcp_c1 */
close(s_w);
Client_Receive();
sprintf(cmdline,"kill -9 %d",cpid);
system(cmdline);
}else{
close(s_r);
Client_Send();
}
|
客户机服务器接收和发送数据的方法
数据的传送过程
硬件划分:
├←─── 服务器 ───→┼← 网络 →┼←── 客户机 ──→┤
┌──┐⑥┌──┐⑦┌──┐
┌→┤qid4├→┤ L2 ├→┤qid2├─┐
⑤│ └──┘ └──┘ └──┘ ↓⑧
┌──┐ ┌──┴──┐ ┌──→ ┌──┴──┐ ┌────┐
│ DB ├←→┤s_process │ │ │c_process ├←→┤终端用户│
└──┘ └──┬──┘ └─── └──┬──┘ └────┘
④↑ ┌──┐ ┌──┐ ┌──┐ │①
└─┤qid3├←┤ L1 ├←┤qid1├←┘
软件划分: └──┘③└──┘②└──┘
├←─ s_process ──→┼←tcp_s→┼←tcp_c→┼← c_process →┤
图2 数据在客户机服务器之间传递的全过程
其中s_process和c_process是分别运行在服务器上的服务器业务程序和运行在客户机上的客户业务进程。qid3,qid4和qid1,qid2是分别存在于服务器及客户机上的消息队列。
tcp_s和tcp_c是分别运行在服务器和客户机上的通信软件。在客户机和服务器之间建立的两条连接是L1和L2,其中L1专用于客户机至服务器,L2专用于服务器至客户机。
下面叙述图2中所示的数据传递过程,同时介绍用于数据接收和发送的四个函数。因为业务程序不知何时可以接收或发送消息,所以这四个函数都存在一个循环不断地试图接收或发送数据。表示消息的数据结构是sg_buf,消息由消息类别mtype及正文段mdata组成。
正文段中存放的数据是无结构的,必须定义一种数据结构(struct),用结构中的各变量对mdata进行划分,从而使mdata中的数据可以被理解和使用。还可将mdata前面的一部分区域划出来重新命名用作其他用途。消息在整个数据传递的过程中起类似“载体”的作用。
#define MSGSIZE 200
struct msg_buf{
long mtype; /* 消息类别 */
long cpid; /* 客户业务进程标识号 */
long sid; /* 共享内存记录编号 */
long msgid; /* 消息编号 */
char mdata[MSGSIZE-16]; /* 数据区 */
}
|
① 客户业务程序c_process从终端用户接收数据,先存放在一个结构中,然后将该结构的内容依照一定的格式拷入buf->mdata中,然后将buf以消息的形式放入消息队列qid1中。
pidc=getpid();/* c_process的进程号 */
buf->mtype=1; /* 消息类别都为1 */
buf->sid=0; /* sid在客户机没用 */
buf->msgid=++msgid;
buf->cpid=pidc;
msgsnd(qid1,buf,MSGSIZE,0);
|
② 进程tcp_c调用函数Client_Send从qid1中取得消息,然后往L1写给服务器。从qid1中取消息时对消息并不予于区别,凡在qid1中的消息都要由进程tcp_c来发送。
for(;;){ /* 取mtype=1的消息 */
msgrcv(qid1,buf,MSGSIZE,1,0);
write(s_w,buf,i+1);
}
|
③ 进程tcp_s1调用函数Server_Receive从L1读数据至buf中,将buf作为消息放入qid3中。
for(n=1;n<=linkn;n++){
s1=shm_info(n,GETS1);
i=read(s1,buf,MSGSIZE);
if(i==-1) continue;
if(i==0) ... /* 判断出客户机已退出 */
/* n是s1在共享内存登记项的编号 */
buf->sid=n;
msgsnd(qid3,buf,MSGSIZE,0);
}
|
④ 服务器业务程序s_process从消息队列qid3中接收消息到buf,然后将buf->mdata转成结构,根据结构的内容对数据库进行操作。s_process处在一个循环中,一有消息就取走去作消息所要求的操作,对消息并不加以区别。如果没有消息函数msgrcv就处于堵塞状态。
⑤ s_process根据消息的内容访问数据库后将结果放在一个结构中,然后将该结构的内容拷到buf->mdata中,再将缓冲区buf以消息的形式放于消息队列qid4中,最后s_process又要继续循环再去接收新的消息。
for(;;){
msgrcv(qid3,buf,MSGSIZE,1,0);
... ...
/* 解释buf->mdata的内容,对数据库进行操作后再将结果存放在buf->mdata中 */
buf->mtype=1;
msgsnd(qid4,buf,MSGSIZE,0);
}
|
⑥ 进程tcp_s2调用Server_Send从qid4中取走mtype=1的第一个消息,往L2写回客户机。
for(;;){
i=msgrcv(qid4,buf,MSGSIZE,1,0);
if(i==-1) continue;
s2=shm_info(buf->sid,GETS2);
write(s2,buf,i+1);
}
|
⑦ 进程tcp_c1调用函数Client_Receive从L2读数据到buf中,将buf作为消息放入qid2中。如果函数read