分类: LINUX
2012-02-21 21:03:59
++++++APUE读书笔记-17高级进程通信-02基于流的管道++++++
2、基于流的管道
================================================
基于流的管道(简称“流管道”即stream pipe),是一个全双工的管道,父子进程之间通过单一的管道就可以进行双向的通信。
前面已经说过,Solaris系统支持stream pipes,而linux上面可以通过一个可选的软件包来提供这样的功能。
下图用两种方式展示了stream pipes:
+-----User Process----+ +---User Process-----+
| | | |
| fd[0] fd[1] | |fd[0] fd[1]|
+----^----------^-----+ +-^---------------^--+
\ / | |
-------- +-----|---------------|------+
| +->Stream Pipe<-+ |
+-----------Kernel-----------+
如果我们深入到Stream pipe内部,我们将会看到两个stream头部,每一个写队列(WQ)指向另外一个读队列(RQ)。对一端写入数据,将会放到另外一端读取队列的消息中去。
Stream pipe 的内部
+---------+ +---------+
| +----+ | | +----+ |
| | WQ |---------\ /----------| WQ | |
| +----+ | \ / | +----+ |
fd[0] | | \/ | | fd[1]
| +----+ | /\ | +----+ |
| | RQ |<---------/ \---------->| RQ | |
| +----+ | | +----+ |
+---------+ +---------+
Stream head Stream head
由于Stream pipe是一个流,我们可以将streams模块推送到管道的任何一段来处理写入管道的数据,后面会给出相应的图形。但是如果我们将一个模块推送到一端,我们不能从另外一端将它弹出来。如果我们想要移走它,那么我们需要从推送它的那一端将它移走。
含有Module的Stream pipe 的内部
+---------+ +---------+ +---------+
| +----+ | | +----+ | | +----+ |
| | WQ |--------------->| WQ |--------\ /-----------------| WQ | |
| +----+ | | +----+ | \ / | +----+ |
fd[0] | | | | \/ | | fd[1]
| +----+ | | +----+ | /\ | +----+ |
| | RQ |<-------------- | RQ |<--------- ------------------>| RQ | |
| +----+ | | +----+ | | +----+ |
+---------+ +---------+ +---------+
Stream head module Stream head
假设我们没有做任何事情,例如推送一个模块。一个stream pipe的表现好象是一个无stream的pipe,当然它还是支持streamio(7)中所说的多数的STREAMS的ioctl命令的。在后面的章节中,我们将会看到一个例子,这个了例子展示了当我们在文件系统中给定一个管道名称的时候,通过向streams pipe推送一个模块来提供唯一的连接。
举例
这里对前面的协作处理程序使用单个的streams pipe做了一个重新的实现。下面的代码给出了新的main函数,而add2协作处理程序和前面的一样。我们调用了一个新的函数s_pipe来创建一个单一的streams pipe(我们马上会给出这个函数的stream pipes和unix domain sockets两种实现)。
通过streams pipe来调用add2程序的例子
#include "apue.h"
static void sig_pipe(int); /* our signal handler */
int main(void)
{
int n;
int fd[2];
pid_t pid;
char line[MAXLINE];
if (signal(SIGPIPE, sig_pipe) == SIG_ERR)
err_sys("signal error");
if (s_pipe(fd) < 0) /* need only a single stream pipe */
err_sys("pipe error");
if ((pid = fork()) < 0) {
err_sys("fork error");
} else if (pid > 0) { /* parent */
close(fd[1]);
while (fgets(line, MAXLINE, stdin) != NULL) {
n = strlen(line);
if (write(fd[0], line, n) != n)
err_sys("write error to pipe");
if ((n = read(fd[0], line, MAXLINE)) < 0)
err_sys("read error from pipe");
if (n == 0) {
err_msg("child closed pipe");
break;
}
line[n] = 0; /* null terminate */
if (fputs(line, stdout) == EOF)
err_sys("fputs error");
}
if (ferror(stdin))
err_sys("fgets error on stdin");
exit(0);
} else { /* child */
close(fd[0]);
if (fd[1] != STDIN_FILENO &&
dup2(fd[1], STDIN_FILENO) != STDIN_FILENO)
err_sys("dup2 error to stdin");
if (fd[1] != STDOUT_FILENO &&
dup2(fd[1], STDOUT_FILENO) != STDOUT_FILENO)
err_sys("dup2 error to stdout");
if (execl("./add2", "add2", (char *)0) < 0)
err_sys("execl error");
}
exit(0);
}
static void sig_pipe(int signo)
{
printf("SIGPIPE caught\n");
exit(1);
}
父进程只使用fd[0]进行读写(向fd[0]写的内容反应到fd[1]中,从fd[0]读取的内容来自fd[1]),子进程只使用fd[1]进行读写。因为streams pipe的每个末端都是全双工的,所以父进程只是读取和写入fd[0];子进程将标准输入输出复制到fd[1],达到对fd[1]进行读写的目的(之前是使用了两个半双工管道对应两对文件描述符号,这里只使用一个管道对应一对文件描述符号)。下面的图就展示了结果文件描述符号的情况。注意这里的例子也可以使用不基于流的全双关工的pipes来实现,因为这个例子没有使用任何关于流的特性。
前面说过FreeBSD支持全双工pipes,但是这样的pipes不是基于流机制的。
协作处理程序中的文件描述符号布局
Parent Child(coprocess)
+----------+ +--------------+
| | /--------------->| stdin |
| fd[0] <-------|---------------->| fd[1] |
| | \--------------->| stdout |
+----------+ +--------------+
我们定义的s_pipe和标准的pipe函数类似。两个函数使用接收同样的参数,但是s_pipe返回的文件描述符号以读和写的方式打开。
基于流的s_pipe函数的例子
下面的代码展示了基于流的s_pipe函数的实现。这个实现只是调用了标准的pipe函数,创建全双工的管道。
#include "apue.h"
/*
* Returns a STREAMS-based pipe, with the two file descriptors
* returned in fd[0] and fd[1].
*/
int s_pipe(int fd[2])
{
return(pipe(fd));
}
有名stream pipes
一般来说,pipes只能在亲戚进程之间使用,一般是子进程从它们的父进程将pipes继承过来。前面我们看到过非相关进程可以使用FIFOs进行通信,但是这个只提供了单向的通信路径。streams机制为进程提供了一个方法,通过这个方法,进程可以给定文件系统中的一个pipe名字,这解决了单向的FIFOs问题。
我们可以使用fattach函数为Streamspipe提供一个文件系统中的名字。
#include
int fattach(int filedes, const char *path);
返回:如果成功返回0,如果错误返回1。
参数path必须引用一个已经存在的文件名称,并且调用的进程必须拥有这个文件并且具有对这个文件的写权限,或者以超级用户的身份运行。
一旦一个streams pipe被附加到一个文件系统上面的名称上面,那么相应的文件就变得不可访问了。任何打开这个名称的进程将获得相应的pipe的访问而不是那个文件。任何在pipe附加到该文件之前就打开这个文件的进程仍然可以继续对这个文件进行访问。实际上,这些进程一般不会知道现在这个名称正在引用一个不同的文件。
下图展示了一个附加到路径/tmp/pipe上面的pipe。这个pipe只有一个端被附加到文件系统上面的名字上面,另外一个端用来和打开这个被附加的文件的进程进行通信。尽管可以这样将任何类型的流文件描述符号附加到一个文件系统上面的名字上面,但是fattach函数一般用来为一个stream pipe提供一个名字而使用。
挂载到一个文件系统中的名字上面的pipe
+------------------+
| user process |
+--------------^---+
\
\
v
+------------------+ +------------------+
| stream head | | stream head | /tmp/pipe
+------------^-----+ +---^--------------+
\ pipe /
---------------------
一个进程可以调用fdetach来取消一个streams file和一个文件系统中文件名称之间的关联。
#include
int fdetach(const char *path);
返回:如果成功返回0,如果错误返回1。
在调用fdetach之后,任何通过打开这个路径访问相应的streams pipe的进程还是会仍然继续对这个stream进行访问,但是之后对这个路径打开进行访问的进程将会访问到这个路径对应的文件系统中的原始的文件。
单一连接
尽管我们可以将一个stream pipe的末端附加到文件系统上面的一个路径名字上面,我们在多个客户进程使用有名的streams pipe和服务进程进行通信的时候还是会遇到问题。来自一个客户进程的数据可能会干扰另外一个写入管道的客户进程的数据。尽管我们可以保证客户进程写的数据量少于PIPE_BUF字节这样写操作就是原子的了,我们没有办法向一个特定的客户进程反馈并且保证就是那个客户进程读取到了消息。因为有许多进程读取同一个管道,我们无法控制那个进程实际读取我们发送的数据。
connld的stream module解决了这个问题。在将一个streams pipe附加到一个文件系统中的名字之前,服务进程可以首先将一个connld 模块推送到被附加的管道的末端。这样如下图所示:
为单一连接设置connld
+-----------------+ +-----------------+
| server process | | client process |
+-----------^-----+ +-^---------------+
\ .
\ /tmp/pipe .
+v--------------+ +-------------v+
| stream head | | stream head |
+--------^------+ +------^-------+
| |
| |
\ +-------v------+
\ | CONNLD |
\ +------^-------+
\ pipe |
--------------------------+
在上面的图中,服务进程附加pipe的一个末端到/tmp/pipe上面,这里我们用虚线表示客户进程(client process)正在打开一个附加的streams pipe。一旦打开过程完毕了,那么我们就有了下图所示的情况:
使用connld创建单一连接
+----------------+ +---------------+
| server process | | client process|
+-^-------------^+ +--------------^+
/ \ \
/ /tmp/pipe ----------------\ \
+--------------v-+ +---------------+ +v-------------+ +-v-----------+
| stream head | | stream head | | stream head | | stream head|
+--------^-------+ +-------^-------+ +---------^----+ +------^------+
| | \ pipe /
\ | --------------------
\ +-------v-------+
\ | CONNLD |
\ +-------^-------+
\ pipe |
------------------+
客户进程不会为它打开的管道末端接收一个打开的文件描述符号。相反,操作系统创建一个新的管道然后返回给客户进程一个新的管道末端做为打开/tmp/pipe的结果。系统发送新管道的另外一个端给服务进程,发送的方式是通过将它的文件描述符号在已经存在的附加了文件的的管道上进行传输,这样就会导致在客户进程和服务进程上面只有一个单一的连接了。我们将会在后面看到使用stream pipes传输文件描述符号的机制。
fattach函数构建在mount系统调用之上,这个工具被当做被挂载了的streams。被挂载了的streams和connld模块的这些机制后来被SVR4纳入。
我们接下来开发三个函数,这些函数可以用来在非继承的进程之间创建单一的连接。这些函数模拟之前讲过的面向连接的套接字函数。我们使用streams pipes来做其底下的通信机制,后面我们会看到也可以使用UNIX domain sockets来实现这些函数。
#include "apue.h"
int serv_listen(const char *name);
返回:如果成功返回侦听的文件描述符号,如果错误返回负数。
int serv_accept(int listenfd, uid_t *uidptr);
返回:如果成功返回新的文件描述符号,如果错误返回负数。
int cli_conn(const char *name);
返回:如果成功返回文件描述符号,如果错误返回负数。
函数serv_listen在服务进程侦听客户进程向公共名称(文件系统中的路径)发送连接请求的时候,可以用于服务进程。客户进程将会在他们想要连接服务进程的时候使用这个名称。返回值就是streams pipe的服务端。代码如下:
#include "apue.h"
#include
#include
/* pipe permissions: user rw, group rw, others rw */
#define FIFO_MODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)
/*
* Establish an endpoint to listen for connect requests.
* Returns fd if all OK, <0 on error
*/
int serv_listen(const char *name)
{
int tempfd;
int fd[2];
/*
* Create a file: mount point for fattach().
*/
unlink(name);
if ((tempfd = creat(name, FIFO_MODE)) < 0)
return(-1);
if (close(tempfd) < 0)
return(-2);
if (pipe(fd) < 0)
return(-3);
/*
* Push connld & fattach() on fd[1].
*/
if (ioctl(fd[1], I_PUSH, "connld") < 0) {
close(fd[0]);
close(fd[1]);
return(-4);
}
if (fattach(fd[1], name) < 0) {
close(fd[0]);
close(fd[1]);
return(-5);
}
close(fd[1]); /* fattach holds this end open */
return(fd[0]); /* fd[0] is where client connections arrive */
}
服务端使用serv_accept函数等待客户连接请求到达。当请求到达的时候,系统会自动创建一个新的streams pipe,然后这个函数返回一个末端给服务进程。另外,客户进程的有效用户ID被存放在uidptr指向的内存。代码如下:
#include "apue.h"
#include
/*
* Wait for a client connection to arrive, and accept it.
* We also obtain the client's user ID.
* Returns new fd if all OK, <0 on error.
*/
int serv_accept(int listenfd, uid_t *uidptr)
{
struct strrecvfd recvfd;
if (ioctl(listenfd, I_RECVFD, &recvfd) < 0)
return(-1); /* could be EINTR if signal caught */
if (uidptr != NULL)
*uidptr = recvfd.uid; /* effective uid of caller */
return(recvfd.fd); /* return the new descriptor */
}
一个客户调用cli_conn来连接到服务器。name参数指定的名称必须和服务端serv_listen中指定的名称一样。返回的时候,客户进程得到一个连接到服务端的连接。代码如下:
#include "apue.h"
#include
#include
/*
* Create a client endpoint and connect to a server.
* Returns fd if all OK, <0 on error.
*/
int cli_conn(const char *name)
{
int fd;
/* open the mounted stream */
if ((fd = open(name, O_RDWR)) < 0)
return(-1);
if (isastream(fd) == 0) {
close(fd);
return(-2);
}
return(fd);
}
我们仔细检测返回的文件描述符号是否引用了一个streams设备,以防服务进程没有启动但是这个文件名称仍然在文件系统中存在着。后面我们将会看到这三个函数是怎样被使用的。
参考: