Unix操作系统常用于Client/Server结构中的Server(服务器端),在服务器端编程中,我们一般采用并发多进程的方式来接收处理Client(客户端)的服务请求。我们知道,在系统资源(CPU、内存、硬盘等)一定的情况下,系统所能负担的进程数量是有限的,如果超过这个限制,系统就会紊乱甚至崩溃。通讯超时过长、某种业务处理太慢,都会引起短时间内系统并发进程数的骤然增加,导致系统运行不稳定。因此,在Client/Server结构中,必须采取措施来控制并发进程的数量。在Unix系统中可采取多种方式来实现这种控制功能,如文件锁、信号量等。这里介绍一种简洁、可靠的方法,即采用Unix的消息队列来实现并发进程数的控制。 一、主要系统 1.int msgget (key_t key, int flags) 取得相应键值为key 的消息队列标识符。 2.int msgsnd (int qid , struct msgbuf * buf, int nbytes , int flags) 向标识符为qid 的消息队列发送长度为nbytes、已存于缓冲区buf 中的消息。 3.int msgrcv (int qid , struct msgbuf * buf ,int nbytes ,long mtype , int flags ) 从标识符为qid 的消息队列中接收类型为mtype、长度为nbytes的消息队列到buf缓冲区中。 4.int msgctl (int qid ,int cmd , struct msgid_ds *sbuf ) 按cmd表示的命令来控制标识符为qid的消息列。如果将cmd设为IPC_RMID,表示从系统中删除该消息队列。 Unix操作系统内核确保了上述对队列的读、写、控制等操作都是原子性的。当对同一队列同时进行读写时,内核自动将其转化为串行方式依次操作这一队列。 二、实现原理 进程数控制的实现原理是:初始化消息队列,并预先放入一定数量的消息,这预先放入的消息个数就是允许的相应进程最大并发数。当生成一个新进程时,从该消息队列中读取一个消息。如果此时无消息可读,表示进程数量已达到最大,返回失败;否则,接受客户端的处理请求。当该业务处理完成,须终止该进程时,将一个消息写回到队列中,以便系统可再生成新的进程进行业务处理。可以看出,队列中的消息已被模拟成一种令牌,当客户端申请业务处理时,应先取得一个令牌,业务处理完毕则将令牌归还。当令牌被全部占用时,服务器端将拒绝后续的业务处理请求。 三、应用实例 下面给出一个简单的范例来说明实现的方法,在SCO Open Server 5.0.4上编译通过。它可应用于实际的Client/Server环境中,并可沿此思想进一步拓展:如将业务处理进行分类,并按确定的优先次序来处理它们;动态控制不同类型的业务处理的最大进程数等等。 #include #include #include #include #include #include #include #include #include #include #include #include #include
#define LIMMSGQ 0x20000l #define MGFLAG 0777 struct lmsgstruct { long mtype; char mdata[32]; };
#define TESTTYPE 1
int Pliminit(int typenum,int procnum) { int msqid,i; struct lmsgstruct rbuf;
msqid = msgget(LIMMSGQ+typenum, MGFLAG|IPC_CREAT); if (msqid < 0) { return (-1); } while (msgrcv(msqid,(struct msgbuf *)&rbuf,sizeof(rbuf.mdata),0L,IPC_NOWAIT|MSG_NOERROR) >0); rbuf.mtype = 1L; memset(rbuf.mdata,‘T',sizeof(rbuf.mdata)); rbuf.mdata[sizeof(rbuf.mdata)-1] = ‘\0'; for (i = 0;i< procnum;i++) if (msgsnd(msqid,(struct msgbuf *)&rbuf,sizeof(rbuf.mdata),IPC_NOWAIT) < 0) { return (-2); } return(1); }
int Plimread(int typenum) { int msqid,ret; struct lmsgstruct rbuf;
msqid = msgget(LIMMSGQ+typenum, MGFLAG); if (msqid < 0) { return (-1); } ret = msgrcv(msqid,(struct msgbuf *)&rbuf,sizeof(rbuf.mdata),1L,IPC_NOWAIT|MSG_NOERROR); if ((int )ret < (int )sizeof(rbuf.mdata)) { return (-2); } return(1); }
int Plimwrite(int typenum) { int msqid,ret; struct lmsgstruct rbuf;
msqid = msgget(LIMMSGQ+typenum, MGFLAG); if (msqid < 0) { return (-1); } rbuf.mtype = 1L; memset(rbuf.mdata,‘M',sizeof(rbuf.mdata)); rbuf.mdata[sizeof(rbuf.mdata)-1] = ‘\0'; if (msgsnd(msqid,(struct msgbuf *)&rbuf,sizeof(rbuf.mdata),IPC_NOWAIT) < 0) { return (-2); } return(1); }
int Plimend(int typenum) { int msqid; struct msqid_ds ttt; msqid = msgget(LIMMSGQ+typenum, MGFLAG); if (msqid >= 0) { msgctl(msqid,IPC_RMID ,&ttt); } return(1); }
main() { int ret;
ret = Pliminit(TESTTYPE,4); /*初始化队列,设定最大进程数为4 */ if (ret < 0) { printf(“main:Pliminit failed!\n"); exit(-1); } while (1) s int c; printf(“测试主程序\n"); printf(“1——生成1个子进程\n"); printf(“0——退出\n"); scanf(“%1d",&c); /*接收客户请求*/ if ( c == 0 ) break; ret = Plimread(TESTTYPE); /*检查正运行进程数是否已到最大并取得令牌*/ if(ret < 0) /*正运行进程数已到最大,不进行业务处理*/ { printf(“main:Plimread failed!\n"); printf(“system busy,please wait a moment!\n"); } else if ( fork() == 0 ) /*正运行进程数未到最大,生成子进程*/ { /*进行相关业务处理*/ sleep(10); /*假定业务处理时间为 10s*/ Plimwrite(TESTTYPE); /*业务处理完毕,退还令牌,子进程退出*/ exit(1); } } Plimend(TESTTYPE); /*撤消队列*/ exit(1); } /* 编译方法: cc -o ./ctltest ctltest.c 执行方法: ./ctltest 如果在10秒钟内欲连续生成5个子进程,那么在生成第5个子进程时,将失败; 等待10秒钟后,则可继续生成子进程。 */
| | |