#include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/ipc.h> #include <sys/msg.h> #include <errno.h> #include <pthread.h>
#define REQUEST 1 #define REPLY 2 #define MAX_SIZE 1024 // enough large!
#define BAD_LAMPORT 9527 // large enough!
// 本进程消息队列的id号
int my_msqid; int lamport;
// 组中所有进程消息队列的id号
int msqid0; int msqid1; int msqid2;
// 消息内容
struct Msgbuf{ long mtype; int msqid; int lamport; };
// 全局链表保存未发送的reply
struct Msgid{ int msqid; struct Msgid *next; } *msgid_list;
void send_msg(int dstid,struct Msgbuf *buf,long type){ buf->mtype = type; buf->msqid = my_msqid; buf->lamport = lamport; if((msgsnd(dstid,buf,8,0)) == -1){ fprintf(stderr,"Error:msgsnd:%d:%s\n",errno,strerror(errno)); exit(1); } if(type == REQUEST) fprintf(stdout,"Node %d send request to %d\n",my_msqid,dstid); else fprintf(stdout,"Node %d send reply to %d\n",my_msqid,dstid); }
void wait_msg(struct Msgbuf *buf,long type){ buf->mtype = type; if(msgrcv(my_msqid,buf,MAX_SIZE,type,0) == -1){ fprintf(stderr,"Error:msgrcv:%s\n",strerror(errno)); exit(1); } if(type == REQUEST) fprintf(stdout,"Node %d recieve request from %d\n",my_msqid,buf->msqid); else fprintf(stdout,"Node %d recieve reply from %d\n",my_msqid,buf->msqid); }
void *process_request(void *ptr){ msgid_list = NULL; struct Msgbuf buf; int i; for(i=0;i<2;i++){ wait_msg(&buf,REQUEST); if(lamport == BAD_LAMPORT || buf.lamport < lamport){ send_msg(buf.msqid,&buf,REPLY); }else{ // 保存未发送的reply到全局链表中
struct Msgid *id = (struct Msgid *)malloc(sizeof(struct Msgid)); id->msqid = buf.msqid; id->next = msgid_list; msgid_list = id; } } }
int main (int argc, char **argv){ if (argc != 3){ fprintf (stderr, "Usage:%s 本进程序号 本进程lamport值 \n", argv[0]); exit (1); } my_msqid = msgget(ftok(".",*argv[1]), IPC_CREAT | 0666); lamport = atoi(argv[2]); msqid0 = msgget(ftok(".",'0'), IPC_CREAT | 0666); msqid1 = msgget(ftok(".",'1'), IPC_CREAT | 0666); msqid2 = msgget(ftok(".",'2'), IPC_CREAT | 0666); struct Msgbuf qbuf; fprintf(stdout,"Node started...\tId:%d Lamport:%d \n",my_msqid,lamport);
// 打开一个线程来转化request为reply
pthread_t tid; pthread_create(&tid,NULL,&process_request,NULL); if(lamport != BAD_LAMPORT){ // 给其他三个进程的消息队列发送请求
if(msqid0 != my_msqid) send_msg(msqid0,&qbuf,REQUEST); if(msqid1 != my_msqid) send_msg(msqid1,&qbuf,REQUEST); if(msqid2 != my_msqid) send_msg(msqid2,&qbuf,REQUEST); // 等待本进程消息队列的回复
//wait_msg(&qbuf,REPLY);
wait_msg(&qbuf,REPLY); wait_msg(&qbuf,REPLY); // 写入文件
fprintf(stdout,"Node Id:%d write sth to file\n",my_msqid);
// 把保留的reply发送出去
struct Msgid *t = msgid_list; struct Msgid *f; struct Msgbuf buf; while(t!=NULL){ send_msg(t->msqid,&buf,REPLY); f = t; t = t->next; free(f); } } pthread_join(tid,NULL); return 0; }
|