欢迎光临我的博客
分类:
2010-08-19 13:32:29
当不同进程访问相同资源(比如文件)时,可能会产生冲突,这就需要有一种同步措施保证该过程的正确性。此外,一个进程有时也需要向其他进程发送或从其他进程接收数据。这些都属于进程间通信。由于进程的空间都是独立的,通过全局变量来实现进程间的通信是不可能的(现代进程的共享内存除外),这就需要内核提供一种特定的机制来实现它。
本版UNIX提供了三种进程间通信机制:管道(pipe)、文件(file)和第12章中所讲的信号(signal)。
管道是一种用于进程间数据传送的机制,顾名思义,数据在其中的传输就像水流在水管中流动一样。它实际上是一种特殊的文件,但和一般文件不同的是,它有两个句柄——读句柄和写句柄,而且只创建在根设备上。读取方(reader,也叫读取进程)使用读句柄从管道中读入数据,写入方(writer,也叫写入进程)使用写句柄向管道内写入数据。管道是单向的,读句柄只能够用于读数据,写句柄也只能用于写数据,如图13-1所示。如果需要实现双向传输,可创建两个管道。管道的数据访问严格按照先进先出(FIFO)原则,不支持seek文件定位操作。管道最长(数据容量)为4096个字节。
图13-1 管道示意图
UNIX中的管道又叫“无名管道”,因为它没有名称(尽管它实际上是文件)、只能通过句柄访问。现代操作系统(比如Linux)引入了“有名管道”的概念。“无名管道”只能用于有亲缘关系的进程(比如父子进程)之间通信,而“有名管道”可用于任意两个进程间通信。
)用户实现
函数原型:int pipe(int fd[2]);
功能描述:创建一个管道,句柄在fd中返回。其中fd[0]是读句柄,fd[1]是写句柄。创建成功返回0,失败返回-1。
参数说明:fd是出参,用于存放管道句柄。
---------------------选自光盘文件/usr/source/s/s5/pipe.---------------------
/ pipe -- C library
/ pipe(f)
/ int f[2];
1. .globl _pipe, cerror
2. pipe = 42.
3. _pipe:
4. mov r5,-(sp)
5. mov sp,r5
6. sys pipe
7. bec 1f
8. jmp cerror
9. 1:
10. mov 4(r5),r2
11. mov r0,(r2)+
12. mov r1,(r2)
13. clr r0
14. mov (sp)+,r5
15. rts pc
正由于fd是出参,所以本函数在调用内核实现时,未传入参数。因为在内核实现返回后,如果创建成功,则读句柄在r0中,写句柄在r1中,这样第10行把fd赋给r2,然后第11行把r0赋给fd[0],第12行把r1赋给fd[1]。最后返回0.
)内核实现
函数原型:void pipe();
功能描述:系统调用pipe的内核实现。
参数说明:无。
----------------------选自光盘文件/usr/sys/ken/pipe.c----------------------
/* Max allowable buffering per pipe.
* This is also the max size
of the
* file created to implement
the pipe.
* If this size is bigger
than 4096,
* pipes will be implemented
in LARGe
* files, which is probably
not good.
*/
#define PIPSIZ 4096
/* The sys-pipe entry.
* Allocate an inode on the
root device.
* Allocate 2 file structures.
* Put it all together with flags.
*/
1. pipe()
2. {
3. register *ip, *rf,
*wf;
4. int r;
5. ip = ialloc(rootdev);
6. if(ip == NULL)
7. return;
8. rf = falloc();
9. if(rf == NULL) {
10. iput(ip);
11. return;
12. }
13. r =
u.u_ar0[R0];
14. wf = falloc();
15. if(wf == NULL)
{
16. rf->f_count = 0;
17. u.u_ofile[r] = NULL;
18. iput(ip);
19. return;
20. }
21. u.u_ar0[R1] =
u.u_ar0[R0];
22. u.u_ar0[R0] =
r;
23. wf->f_flag
= FWRITE|FPIPE;
24. wf->f_inode
= ip;
25. rf->f_flag
= FREAD|FPIPE;
26. rf->f_inode
= ip;
27. ip->i_count
= 2;
28. ip->i_flag
= IACC|IUPD;
29. ip->i_mode
= IALLOC;
30. }
正如注释所言,之所以管道长度定义为4096,因为这是“小文件”的上限大小,否则管道就需要使用“大文件”实现,这可能不太方便,至少影响性能(“大文件”的访问总是要比“小文件”慢一些),且浪费磁盘空间。
pipe函数和文件的创建过程基本一样,所不同的是它需要分配两个句柄。
第5行首先在根设备上分配节点,如分配失败,则函数返回。
第8行分配读文件结构,用于管道读操作。同时读句柄在u.u_ar0[R0]中返回。如分配失败则第10行释放文件节点并返回。
如读文件结构和句柄分配成功,第14行继续分配写文件结构和句柄。如果分配失败,则第16~19行释放读文件结构和节点并返回。否则第21行把写句柄存放到u.u_ar0[R1]中,第22行把读句柄r存放到u.u_ar0[R0]中,这样在返回到用户实现中时,r1将是写句柄,r0将是读句柄。
第23~26行设置写/读文件结构。
第27行设置文件打开计数为2,因为它已被打开两次——读句柄和写句柄。
最后第29行标识节点已分配。
可以看出,管道类似于“无名文件”,需要通信双方同时打开它,一个写入数据、一个读取数据。
)用户实现
它和文件写入调用一样,具体内容请参见第12章。
)内核实现
前半部分和文件写入一样,write调用rdwr、rdwr判断出文件是管道时,调用writep。这和文件的writei不一样。
函数原型:void writep(file *fp);
功能描述:向管道中写入指定长度数据。数据地址是u.u_base(在用户空间),数据长度为u.u_count。数据被添加到管道(文件)的末端。
参数说明:fp是管道对应的写文件结构。
/* Write call directed to a pipe.
*/
1. writep(fp)
2. {
3. register *rp, *ip, c;
4. rp = fp;
5. ip = rp->f_inode;
6. c = u.u_count;
7. loop:
8. /* If all done, return.
9. */
10. plock(ip);
11. if(c == 0) {
12. prele(ip);
13. u.u_count = 0;
14. return;
15. }
16. /* If there are not both read and
write sides of the pipe active,
return error and signal too.
17. */
18. if(ip->i_count < 2) {
19. prele(ip);
20. u.u_error = EPIPE;
21. psignal(u.u_procp, SIGPIPE);
22. return;
23. }
24. /* If the pipe is full,
wait for reads to delete
and truncate it.
25. */
26. if(ip->i_size1 == PIPSIZ) {
27. ip->i_mode =| IWRITE;
28. prele(ip);
29. sleep(ip+1, PPIPE);
30. goto loop;
31. }
32. /*
Write what is possible and
loop back.
33. */
34. u.u_offset[0] = 0;
35. u.u_offset[1] = ip->i_size1;
36. u.u_count = min(c,
PIPSIZ-u.u_offset[1]);
37. c =- u.u_count;
38. writei(ip);
39. prele(ip);
40. if(ip->i_mode&IREAD) {
41. ip->i_mode =& ~IREAD;
42. wakeup(ip+2);
43. }
44. goto loop;
45. }
第10行首先调用plock锁定管道,因为在写管道的过程中,是不允许读管道的。
/* Lock a pipe.
* If its already locked,
* set the WANT bit and
sleep.
*/
1. plock(ip)
2. int *ip;
3. {
4. register *rp;
5. rp = ip;
6. while(rp->i_flag&ILOCK) {
7.
rp->i_flag =| IWANT;
8.
sleep(rp, PPIPE);
9. }
10. rp->i_flag =| ILOCK;
11. }
如果管道已被锁,则第7行设定IWANT标志,并挂起等待。
第10行获得锁,设定锁标记。
回到writep,第11行判断待写入字节数c是否为0,如为0则释放管道锁并返回。
第18行判断打开计数是否小于2,如果成立,则表明读取方已关闭,则写入也没有意义。所以出错,并在第21行发送SIGPIPE信号通知当前进程。
管道在刚创建后,引用计数是2,所以可以立刻执行写操作。
第26行判断管道是否已写满。如果已写满,则第27行设定IWRITE标记,表明写入方正在等待读取方读取数据。第29行挂起在ip+1上(即&inode.i_dev)。在它被读取方唤醒后,跳转到loop处重新执行刚才的过程。
第34~35行设定偏移为文件末,第36行计算实际允许传送的数据长度,以确保总长不超过PIPSIZ。
第37行计算剩余字节数。
第38行调用writei写入数据到文件,第39行调用prele释放管道锁,以使得读取方或第44行再次跳转到loop时可访问。
第40行判断读取方是否已经读完管道中所有数据,正在等待下一批数据。如果是,则清除IREAD标志,并唤醒读取方。
最后第44行跳转到loop继续传送下一批数据(如果c大于0)。
)用户实现
它和文件读取调用一样,具体内容请参见第12章。
)内核实现
前半部分和文件读取一样,read调用rdwr、rdwr判断出文件是管道时,调用readp。
函数原型:void readp(file *fp);
功能描述:从管道中读取指定长度数据。数据地址是u.u_base(在用户空间),数据长度为u.u_count。读取位置是上次读取后的偏移处。
参数说明:fp是管道对应的写文件结构。
/* Read call directed to a pipe.
*/
1. readp(fp)
2. int
*fp;
3. {
4. register
*rp, *ip;
5. rp = fp;
6. ip = rp->f_inode;
7. loop:
8. /* Very conservative
locking.
9. */
10. plock(ip);
11. /* If the head
(read) has caught up with
the tail
(write), reset both to 0.
12. */
13. if(rp->f_offset[1]
== ip->i_size1) {
14. if(rp->f_offset[1] != 0) {
15. rp->f_offset[1] = 0;
16. ip->i_size1 = 0;
17. if(ip->i_mode&IWRITE) {
18. ip->i_mode =&
~IWRITE;
19. wakeup(ip+1);
20. }
21. }
/* If there are not
both reader and
writer active, return without
satisfying read.
22. */
23. prele(ip);
24. if(ip->i_count < 2)
25. return;
26. ip->i_mode =| IREAD;
27. sleep(ip+2, PPIPE);
28. goto loop;
29. }
30. /* Read and
return
31. */
32. u.u_offset[0]
= 0;
33. u.u_offset[1]
= rp->f_offset[1];
34. readi(ip);
35. rp->f_offset[1]
= u.u_offset[1];
36. prele(ip);
37. }
和writep一样,readp在第10行首先锁定管道以保证操作的原子性。
如果第13行判断成立,表明读取方已经追上写入方,即已读完管道中所有数据,这时设置管道读偏移和数据长度均为0。并在第17行判断IWRITE是否设定。从writep可以看出,如IWRITE被设,则管道已被写满,写入方挂起。所以这里第18行清除IWRITE标志,并在第19行唤醒写入方。
由于管道中已无数据可供读取,所以读取方在第26行设定IREAD标志,表明正等待写入方写入新数据。同时第27行挂起在ip=2,也就是&inode.i_number上。挂起在何地址并不重要,只要和写入方挂起地址区分开即可。在此之前第24行判断管道打开计数是否小于2,从而确定写入方是否还打开管道,如果小于2,则写入方已关闭,这样再挂起等待也没有意义了,所以函数返回。
如果读取方被唤醒,则表明写入方已写入新数据,则第28行跳转到loop,重新开始读取。
如果管道中有数据可供读取,则第32~33行设定偏移为上次读完后的文件偏移,然后第34行调用readi读取数据到缓存u.u_base中。
第35行更新文件偏移rp->f_offset[1]为读后位置,因为readi并不会也无法更新它。这样下一次用户再读取管道时,就会从上次偏移处继续读取。最后释放管道锁。
和writep不同的是,readp在第32~35行读管道的过程中,并未判断管道打开计数ip->i_count是否小于2,这是因为只要管道中有数据,则写入方是否已关闭无关紧要。而writep在写管道时则要求有读取方,这是防止数据滞留在管道中不被读出。因为不管是读取方还是写入方在调用close关闭管道后,自己不能再打开它。
可以看出,管道是基于文件实现的。读取方拥有文件的读句柄,写入方拥有文件的写句柄,它们对同一个文件(也就是管道)进行访问。内核对这种访问加以控制,保证其同步并顺序进行。写入方每次都从管道末开始添加数据,也就是说,它的偏移量是inode.i_size1。读取方从上次读偏移处继续读取,即偏移量是file.f_offset[1]。管道读写过程甲的示意如图13-2所示。
图13-2 管道读写过程甲示意图
图13-2具体说明如下。
如果管道已被写满,即文件大小达到PIPSIZ(4096),写入方挂起,等待读取方把数据读完。每次读取方读完管道内数据后,都会判断写入方是否已挂起(IWRITE标志),如挂起则唤醒它,同时挂起等待写入方写入新数据。此时,管道读写过程乙的示意如图13-3所示。
图13-3 管道读写过程乙示意图
图13-3具体说明如下。
void main()
{
int fd[2];
int id;
char buf[256];
if(pipe(fd) == 0)
{
id = fork();
if(id == 0)
{
id =
read(fd[0], buf, 255);
buf[id] =
'\0';
printf("The
data received from pipe is: %s\n", buf);
close(fd[0]);
}
else
{
write(fd[1],
"This is the first data from pipe.", sizeof("This is the first
data from pipe."));
close(fd[1]);
}
}
}
在本例中,父进程创建管道,然后子进程继承了它。子进程读管道,而父进程写管道。每一方在操作完成后,都及时地关闭了管道。
该段代码在任何情况下都可以正常工作:
(1)如果子进程首先执行,则读管道。由于管道没数据,它会挂起直到父进程write操作完成。这时它被唤醒,读出其中数据;
(2)如果父进程首先执行,则写管道。虽然此时子进程没有调用read,但管道已被双方打开,即inode.i_count = 2,所以write调用成功。然后关闭管道,该操作没有问题是因为read在读管道时并不要求写入方也打开管道。
当然子进程和父进程的代码也可互换,即子进程写管道、父进程读管道。
需要注意的是,管道在关闭后无法再打开。比如子进程代码若在close(fd[0])后加上open(fd[0], 0); 或 read(fd[0], buf, 255); 都会失败。首先open调用出错,因为第一个参数是文件名而非句柄(返回值是句柄);其次句柄fd[0]在被关闭后,再调用read或write对它进行读写就出错了。所以只有在确定不再使用该管道时,才可关闭它。
另外,pipe调用必须在fork之前。