Chinaunix首页 | 论坛 | 博客
  • 博客访问: 162709
  • 博文数量: 16
  • 博客积分: 2000
  • 博客等级: 大尉
  • 技术积分: 195
  • 用 户 组: 普通用户
  • 注册时间: 2007-07-29 08:28
文章分类

全部博文(16)

文章存档

2015年(1)

2010年(15)

我的朋友

分类:

2010-01-21 23:04:29

实现分布式互斥算法:

当进程想进入临界区时,它向所有其他进程发一条打了时间戳的消息Request

当收到所有其他进程的Reply消息时,就可以进入临界区了;

当一个进程收到一条Request消息时,必须返回一条Reply消息。

如该进程自己不想进入临界区,则立即发送Reply消息

如该进程想进入临界区,则把自己的Request消息时间戳与收到的Request消息时间戳相比较,

1)如自己的晚,则立即发送Reply消息

2)否则,就推迟发送Reply消息

---------------------------------------------------------------------------------------------------

代码实现如下:node.c是节点进程文件,main.c是进程组织文件


进程节点文件:node.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <pthread.h>

#define REQUEST 1
#define REPLY 2
#define MAX_SIZE 1024         // enough large!

#define BAD_LAMPORT 9527     // large enough!


// 本进程消息队列的id号

int my_msqid;
int lamport;

// 组中所有进程消息队列的id号

int msqid0;
int msqid1;
int msqid2;

// 消息内容

struct Msgbuf{
    long mtype;
    int msqid;
    int lamport;
};

// 全局链表保存未发送的reply

struct Msgid{
    int msqid;
    struct Msgid *next;
} *msgid_list;

void send_msg(int dstid,struct Msgbuf *buf,long type){
    buf->mtype = type;
    buf->msqid = my_msqid;
    buf->lamport = lamport;
    if((msgsnd(dstid,buf,8,0)) == -1){
        fprintf(stderr,"Error:msgsnd:%d:%s\n",errno,strerror(errno));
        exit(1);
    }
    if(type == REQUEST)
        fprintf(stdout,"Node %d send request to %d\n",my_msqid,dstid);
    else
        fprintf(stdout,"Node %d send reply to %d\n",my_msqid,dstid);
}

void wait_msg(struct Msgbuf *buf,long type){
    buf->mtype = type;
    if(msgrcv(my_msqid,buf,MAX_SIZE,type,0) == -1){
        fprintf(stderr,"Error:msgrcv:%s\n",strerror(errno));
        exit(1);
    }
    if(type == REQUEST)
        fprintf(stdout,"Node %d recieve request from %d\n",my_msqid,buf->msqid);
    else
        fprintf(stdout,"Node %d recieve reply from %d\n",my_msqid,buf->msqid);
}

void *process_request(void *ptr){
    msgid_list = NULL;
    struct Msgbuf buf;
    int i;
    for(i=0;i<2;i++){
        wait_msg(&buf,REQUEST);
        if(lamport == BAD_LAMPORT || buf.lamport < lamport){
            send_msg(buf.msqid,&buf,REPLY);
        }else{
            // 保存未发送的reply到全局链表中

            struct Msgid *id = (struct Msgid *)malloc(sizeof(struct Msgid));
            id->msqid = buf.msqid;
            id->next = msgid_list;
            msgid_list = id;
        }
    }
}

int main (int argc, char **argv){
      if (argc != 3){
          fprintf (stderr, "Usage:%s 本进程序号 本进程lamport值 \n", argv[0]);
          exit (1);
    }
    my_msqid = msgget(ftok(".",*argv[1]), IPC_CREAT | 0666);
    lamport = atoi(argv[2]);
    msqid0 = msgget(ftok(".",'0'), IPC_CREAT | 0666);
    msqid1 = msgget(ftok(".",'1'), IPC_CREAT | 0666);
    msqid2 = msgget(ftok(".",'2'), IPC_CREAT | 0666);
    struct Msgbuf qbuf;
    fprintf(stdout,"Node started...\tId:%d Lamport:%d \n",my_msqid,lamport);

    // 打开一个线程来转化request为reply

    pthread_t tid;
    pthread_create(&tid,NULL,&process_request,NULL);
    
    if(lamport != BAD_LAMPORT){
        // 给其他三个进程的消息队列发送请求

        if(msqid0 != my_msqid)
            send_msg(msqid0,&qbuf,REQUEST);
        if(msqid1 != my_msqid)
            send_msg(msqid1,&qbuf,REQUEST);
        if(msqid2 != my_msqid)
            send_msg(msqid2,&qbuf,REQUEST);
    
        // 等待本进程消息队列的回复

        //wait_msg(&qbuf,REPLY);

        wait_msg(&qbuf,REPLY);
        wait_msg(&qbuf,REPLY);
    
        // 写入文件

        fprintf(stdout,"Node Id:%d write sth to file\n",my_msqid);

        // 把保留的reply发送出去

        struct Msgid *t = msgid_list;
        struct Msgid *f;
        struct Msgbuf buf;
        while(t!=NULL){
            send_msg(t->msqid,&buf,REPLY);
            f = t;
            t = t->next;
            free(f);
        }
    }
    pthread_join(tid,NULL);
    return 0;
}




执行函数文件:main.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

pid_t myfork(){
    pid_t pid;
    if((pid = fork() < 0)){
        fprintf(stderr,"Error:fork\n");
        exit(1);
    }
    return pid;
}

int main(void){
    char lamport_str[100];
    if(fork() == 0){
        // 子进程1

        sprintf(lamport_str,"%d",8);
        char *argv[] = {"./node","0",lamport_str,NULL};
        execvp("./node",argv);
    }
    if(fork() == 0){
        // 子进程2

        sprintf(lamport_str,"%d",12);
        char *argv[] = {"./node","1",lamport_str,NULL};
        execvp("./node",argv);
    }
    if(fork() == 0){
        // 子进程3

        sprintf(lamport_str,"%d",9527);
        char *argv[] = {"./node","2",lamport_str,NULL};
        execvp("./node",argv);
    }
    printf("sleep 4!\n");
    sleep(4);
    printf("wake up!\n");
    return 0;
}


阅读(1153) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~