Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1706875
  • 博文数量: 607
  • 博客积分: 10031
  • 博客等级: 上将
  • 技术积分: 6633
  • 用 户 组: 普通用户
  • 注册时间: 2006-03-30 17:41
文章分类

全部博文(607)

文章存档

2011年(2)

2010年(15)

2009年(58)

2008年(172)

2007年(211)

2006年(149)

我的朋友

分类: LINUX

2008-09-02 13:37:31

信号量Semaphores

当我们为多用户操作系统写涉及到线程操作的程序时,会发现要确保临界区代码(critical sections of code)被唯一的进程或线程访问。
我们发现要编写一个实现上述目的的通用代码是很难的。

1个可能的解决措施是使用O_EXCL标志的open函数创建文件,这会提供一个原子的文件创建。这会让单个进程获得新的文件的令牌。这对简单的问题是管用的。但更复杂的问题就不行了。

Dijkstra提出了PV方法

P(Semaphore variable) for wait
V(Semaphore variable) for signal

P(sv): 如果sv>0,减去1;否则进程运行悬停。
V(sv): 如果有某个其他的悬停的进程,让其继续运行。否则sv加上1。

伪代码:
  semaphore sv = 1;
  loop forever{
    P(sv);
    critical section;
    V(sv);
    ...
  }



Linux 信号量设施
Linux信号量设施不是对单个信号进行操作,而是对一个通用的信号数组进行控制。初看起来,复杂化了,但是当一个进程需要控制多个资源时,这是一个很大的优势。

UbuntuIPC设施是System V IPC.

#include
int semctl(int sem_id, int sem_num, int command, ...);
semctl直接控制信号量。
sem_id是信号量标识符。sem_num不过不是数组,就该是0,
command是采取的命令,2个最常用:
    SETVAL:给信号量一个初始的值,要在第一次使用信号量前设定。这个值将被传给semun的val成员变量.
    IPC_RMID:当一个信号量不再需要时,删除该信号量的标识符。

如果有第4个参数,就是semun. 这个类型定义尽可能采用系统提供的,如果没提供,就自己定义。
通常的定义:
    union semun{
       int val;
       struct semid_ds*  buf;
       unsigned short * array;
   }
int semget(key_t key, int num_sems, int sem_flags);
key供不相干的进程来访问同样的信号量,key只在semget中用到,其他函数都是用semget返回的信号量标识符。
num_sems一般总是1
sem_flags类似于open()的标志,我们可以使用IPC_CREAT和IPC_EXCL来获得一个独占的信号量。

int semop(int sem_id, struct sembuf *sem_ops, size_t num_sem_ops);
sem_id就是semget返回的信号量标识符。
sem_ops指向一个数组结构
struct sembuf{
    short sem_num;//0,除非你用到了信号量数组
    short sem_op;//-1:P操作; +1:V操作
    short sem_flag;//SEM_UNDO: 确保一个进程终止时释放它所关联的信号量。
}

#include
#include
#include
#include
#include
#include
#include "semun.h"   //unbuntu中sys/sem.h没定义semun,但在/usr./src/linux-headers-2.6.22-14/include/linux/sem.h发现了semun的定义

static int set_semvalue(void);
static void del_semvalue(void);
static int semaphore_p(void);
static int semaphore_v(void);
static int sem_id;

int main(int argc, char *argv[])
{
    int i;
    int pause_time;
    char op_char = 'Y';
    srand((unsigned int)getpid());

    sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT); 
    if (argc > 1) {
        if (!init_semvalue()) {
            fprintf(stderr, "Failed to initialize semaphore\n");
            exit(EXIT_FAILURE);
        }
        op_char = 'X';
        sleep(2);
    }

  for(i = 0; i < 10; i++) {
      if (!semaphore_p())    exit(EXIT_FAILURE);
      printf("%c", op_char);      fflush(stdout);
      pause_time = rand() % 3;
      sleep(pause_time);
      printf("%c", op_char);      fflush(stdout);
      if (!semaphore_v())    exit(EXIT_FAILURE);
      pause_time = rand() % 2;
      sleep(pause_time);
  }
  printf("\n%d - finished\n", getpid());
  if (argc > 1) {
      sleep(10);
      del_semvalue(); 
  }
  exit(EXIT_SUCCESS);
}

static int init_semvalue(void)
{
    union semun sem_union;
    sem_union.val = 1;
    if (semctl(sem_id, 0, SETVAL, sem_union) == -1) return(0);
    return(1);
}

static void del_semvalue(void)
{
    union semun sem_union;
    if (semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
        fprintf(stderr, "Failed to delete semaphore\n");
}

static int semaphore_p(void)
{
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = -1; /* P() */
    sem_b.sem_flg = SEM_UNDO;
    if (semop(sem_id, &sem_b, 1) == -1) {
        fprintf(stderr, "semaphore_p failed\n");
        return(0);
    }
    return(1);
}


static int semaphore_v(void)
{
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = 1; /* V() */
    sem_b.sem_flg = SEM_UNDO;
    if (semop(sem_id, &sem_b, 1) == -1) {
        fprintf(stderr, "semaphore_v failed\n");
        return(0);
    }
    return(1);
}


启动2个进程:
$./sem1 1 &
$./sem1


伪随机数的使用
void srand(unsigned int seed);
int rand();

问题:
(1)  进程中的信号量和线程中的信号量有何区别?
(2)  进程有独立的数据段和代码段,为好还要信号量来同步?
答案:
(1)  在LINUX中进程和线程的区别比较小,主要是内存共享的区别,同步机制基本一样。  
(2)  数据段,代码段的独立并不意味着进程就是独立的,内存是共享方式的一种,文件,数据库,管道,有很多方式可以在进程之间传递和共享数据,只要有共享, 就存在冲突和需要协调的地方,因此才需要信号量,更进一步说,你的进程甚至可以是在两台机器上,两台机器之间数据的传递和交换虽说不会涉及同一系统下的进 程同步问题,但分布式系统也需要用消息或者什么方式来保持同步,因此独立是相对的,非孤立系统就需要同步。



共享内存
共享内存是一种高效的在不相干的多个进程间传输数据的方式。
共享内存是被IPC为一个进程创建的一个特定的地址空间。其他进程可以把该地址端attach到自己的地址空间。
所有的进程都可以访问那个内存地址,就好像已被malloc一样。如果1个进程写了共享内存,其他有权访问该共享内存的进程会立刻看到改变。

共享内存本身并不同步机制,这是程序员自己的责任。

需要注意的是:可访问内存的概念包含了物理内存和交换出去到磁盘的内存页。

共享内存的函数:
#include
int shmget(key_t key, size_t size, int shmflg);
key的作用跟信号量中的类似
size是共享内存的字节数;
shmflg:IPC_CREAT.权限标志位很有用。它可以创建能被创建者进程写但只能被其他进程读的这样一种共享内存。
调用成功的话,返回共享内存的标识符。

void *shmat(int shm_id, const void *shm_addr, int shmflg);
attach一个共享内存到本地进程。
shm_id是shmget返回的共享内存的标识符;
shm_addr是共享内存被attach到本地进程的地址,这个通常设定为NULL,由OS来自动选择合适的地址。
shmflg:2个可能的标志: SHM_RDONLY只读,SHM_RND控制共享内存的地址。
调用成功的话,返回共享内存的第一个字节的地址;否则,返回-1。

int shmdt(const void *shm_addr);
detach本地进程的共享内存。shm_addr是shmat返回的地址。

int shmctl(int shm_id, int command, struct shmid_ds *buf);
共享内存的控制函数比信号量要简单些,很感谢。
shm_id是shmget返回的共享内存的标识符。
command:
IPC_START 取得共享内存的状态
IPC_SET  改变共享内存的状态
IPC_RMID  删除共享内存
buf: 存储模式和权限信息,command相关。

struct shmid_ds {
    uid_t shm_perm.uid;
    uid_t shm_perm.gid;
    mode_t shm_perm.mode;
}

例子:
2个程序shm1,c, shm2.c
1个头文件定义共享内存的结构

shm_com.h:

#define TEXT_SZ 2048
struct shared_use_st {
    int written_by_you;
    char some_text[TEXT_SZ];
};

shm1.c:

#include
#include
#include
#include
#include
#include
#include
#include "shm_com.h"
int main()
{
    int running = 1;
    void *shared_memory = (void *)0;
    struct shared_use_st *shared_stuff;
    int shmid;
    srand((unsigned int)getpid());
    shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
    if (shmid == -1) {
       fprintf(stderr, "shmget failed\n");
        exit(EXIT_FAILURE);
    }
    shared_memory = shmat(shmid, (void *)0, 0);
    if (shared_memory == (void *)-1) {
        fprintf(stderr, "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %X\n", (int)shared_memory);

    shared_stuff = (struct shared_use_st *)shared_memory;
    shared_stuff->written_by_you = 0;
    while(running) {
        if (shared_stuff->written_by_you) {
            printf("You wrote: %s", shared_stuff->some_text);
            sleep( rand() % 4 ); /* make the other process wait for us ! */
            shared_stuff->written_by_you = 0;
            if (strncmp(shared_stuff->some_text, "end", 3) == 0) {
               running = 0;
            }
        }
    }
 
    if (shmdt(shared_memory) == -1) {
        fprintf(stderr, "shmdt failed\n");
        exit(EXIT_FAILURE);
    }
    if (shmctl(shmid, IPC_RMID, 0) == -1) {
        fprintf(stderr, "shmctl(IPC_RMID) failed\n");
        exit(EXIT_FAILURE);
    }
    exit(EXIT_SUCCESS);
}
shm2.c:

#include
#include
#include

#include
#include
#include
#include
#include "shm_com.h"
int main()
{
    int running = 1;
    void *shared_memory = (void *)0;
    struct shared_use_st *shared_stuff;
    char buffer[BUFSIZ];
    int shmid;
    shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
    if (shmid == -1) {
        fprintf(stderr, "shmget failed\n");
        exit(EXIT_FAILURE);
    }
    shared_memory = shmat(shmid, (void *)0, 0);
    if (shared_memory == (void *)-1) {
        fprintf(stderr, "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %X\n", (int)shared_memory);
    shared_stuff = (struct shared_use_st *)shared_memory;
    while(running) {
        while(shared_stuff->written_by_you == 1) {
            sleep(1);
            printf("waiting for client...\n");
        }
        printf("Enter some text:" );
        fgets(buffer, BUFSIZ, stdin);
        strncpy(shared_stuff->some_text, buffer, TEXT_SZ);
        shared_stuff->written_by_you = 1;
        if (strncmp(buffer, "end", 3) == 0) {
                running = 0;
        }
    }
    if (shmdt(shared_memory) == -1) {
        fprintf(stderr, "shmdt failed\n");
        exit(EXIT_FAILURE);
    }
    exit(EXIT_SUCCESS);
}

主要注意的是:本程序中用轮询符号变量来同步,这是低效的。真实的程序中要么用IPC消息,要么用信号量来同步。

消息队列
消息队列提供了一种高效的且容易理解的在进程间传递数据的方式。
注意消息的受限:MSGMAX - 单个消息的大小限制; MSGMNB - 消息队列的长度限制。

#include

int msgget(key_t key, int msgflg);
调用成功的话,返回消息的标识符。
msgflg: 通常设定为 0666|IPC_CREAT
 
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);
msgid msgget返回的消息标识符。
msg_ptr 消息的头指针。
msg_sz 消息的载荷大小(msg_ptr指向的块的大小)。
msgflg 控制当消息队列满的时候的情况。IPC_NOWAIT 当队列满,立刻返回(返回值:-1);否则,一直守候着空位机会。

int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
msgid 消息标识符。
msg_ptr
msg_sz
msgtype 被设定为接收特定类型的消息。 0: 头一个消息; n: 第一个消息类型=n的消息; -n:第一个消息类型<=n的消息。
msgflg 控制当消息队列空的时候的情况。IPC_NOWAIT 当队列空,立刻返回(返回值:-1);否则,一直守候着消息到来。

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

struct msqid_ds {
uid_t msg_perm.uid;
uid_t msg_perm.gid
mode_t msg_perm.mode;
}

IPC_START
IPC_SET
IPC_RMID



自定义消息结构
msg_com.h:
#define MAX_TEXT 512
struct my_msg_st {
        long int my_msg_type;
        char some_text[MAX_TEXT];
};

消息接收器

#include
#include
#include
#include
#include
#include
#include
#include
#include "msg_com.h"

int main()
{
 
    int running = 1;
    int msgid;
    struct my_msg_st some_data;
    long int msg_to_receive = 0;
   
    msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
    if (msgid == -1) {
            fprintf(stderr,"msgget failed with error: %d\n", errno);
            exit(EXIT_FAILURE);
    }
   
    while(running) {
        if (msgrcv(msgid, (void *)&some_data, BUFSIZ, msg_to_receive, 0) == -1) {
                fprintf(stderr,"msgrcv failed with error: %d\n", errno);
                exit(EXIT_FAILURE);
        }
        printf("You wrote: %s", some_data.some_text);
        if (strncmp(some_data.some_text, "end", 3) == 0) {
                running = 0;
        }
    }
   
    if (msgctl(msgid, IPC_RMID, 0) == -1) {
        fprintf(stderr,"msgctl(IPC_RMID) failed\n");
        exit(EXIT_FAILURE);
    }
   
    exit(EXIT_SUCCESS);
}



消息发送器

#include
#include
#include
#include
#include
#include
#include
#include
#include "msg_com.h"

int main()
{
        int running = 1;
        struct my_msg_st some_data;
        int msgid;
        char buffer[BUFSIZ];
        msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
        if (msgid == -1) {
                fprintf(stderr, "msgget failed with error: %d\n", errno);
                exit(EXIT_FAILURE);
        }

        while(running) {
              printf("Enter some text: ");
              fgets(buffer, BUFSIZ, stdin);
              some_data.my_msg_type = 1;
              strcpy(some_data.some_text, buffer);
              if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1) {
                      fprintf(stderr, "msgsnd failed\n");
                      exit(EXIT_FAILURE);
              }
              if (strncmp(buffer, "end", 3) == 0) {
                      running = 0;
              }
        }
}
相对于管道,消息并不需要进程提供同步方法。

IPC状态命令
ipcs -m|-s|-q
ipcrm -m|-s|-q







阅读(1537) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~