分类: LINUX
2012-02-10 21:00:10
++++++APUE读书笔记-15进程内部通信-04协作处理程序++++++
4、协作处理程序
================================================
unix系统中的filter是一个程序,这个程序从标准输入读取数据并且写入到标准输出。Filter一般会以线性的方式连接到shell的管道线中。当生成filter的标准输入,以及从filter的标准输出读取数据的程序是同一个程序的时候,filter就变成了一个协作处理程序(coprocesses)(也就是被协作者输入给协作者,协作者处理之后再送回给被协作者)。
Korn shell 提供协作处理程序;Bourne shell, Bourne-again shell,和C shell不提供将进程连接成为协作处理程序的方法。协作处理程序一般通过shell运行在后台,它的标准输入和输出一般通过管道连接到另外的程序上面。尽管初始化一个协作处理程序并将它的标准输入和输出连接到另外的进程上面的shell语法非常的奇怪,但是协作处理程序在c程序中还是非常有用的。
popen为我们提供了一个单向的管道,可以到达另外进程的标准输入或者从其标准输出而来,通过协作处理程序,我们可以拥有到另外进程的两个双向管道,一个通向其标准输入,一个来自其标准输出。我们写它的标准输入,让它处理相应的数据,然后从它的标准输出中读取数据。
例子
这里给出了一个协作处理程序的例子。进程创建两个pipes,一个是协作处理程序的标准输入,另外一个是协作处理程序的标准输出,下图描述了这个结构:
通过写它的标准输入以及读取它的标准输出来运行写作处理程序
+parent-------------+ +child(coprocesses)--+
| fd1[1] |<----pipe1------| stdin |
| fd2[0] |-----pipe2----->| stdout |
+-------------------+ +--------------------+
下面一个简单的协作处理程序从它的标准输入读取两个数字,然后计算两个数字的和,然后将结果写入到它的标准输出中(一般的协作处理程序会做更多的工作,这里只是一个简单的例子,用来学习如何使用管道连接协作处理程序)。
int main(void)
{
int n, int1, int2;
char line[MAXLINE];
while ((n = read(STDIN_FILENO, line, MAXLINE)) > 0) {
line[n] = 0; /* null terminate */
if (sscanf(line, "%d%d", &int1, &int2) == 2) {/*从标准输入读取两个整数*/
sprintf(line, "%d\n", int1 + int2);/*对输入进行计算处理*/
n = strlen(line);
if (write(STDOUT_FILENO, line, n) != n)
err_sys("write error");
} else {
if (write(STDOUT_FILENO, "invalid args\n", 13) != 13)/*将处理结果写入标准输出*/
err_sys("write error");
}
}
exit(0);
}
我们将这个程序编译,生成一个add2的可执行文件。
下面代码的程序,在从标准输入读取两个数字之后,会启动这个add2为协作处理程序,然后把从协作处理程序得到的值写到它的标准输出。
static void sig_pipe(int signo)
{/*我们的信号处理程序*/
printf("SIGPIPE caught\n");
exit(1);
}
int main(void)
{
int n, fd1[2], fd2[2];
pid_t pid;
char line[MAXLINE];
if (signal(SIGPIPE, sig_pipe) == SIG_ERR)
err_sys("signal error");
if (pipe(fd1) < 0 || pipe(fd2) < 0)/*创建管道*/
err_sys("pipe error");
if ((pid = fork()) < 0) {
err_sys("fork error");
} else if (pid > 0) {
close(fd1[0]);
close(fd2[1]);
while (fgets(line, MAXLINE, stdin) != NULL) {/*父进程从标准输入读取数据*/
n = strlen(line);
if (write(fd1[1], line, n) != n)/*父进程将标准输入读取的数据写入到发送管道中*/
err_sys("write error to pipe");
if ((n = read(fd2[0], line, MAXLINE)) < 0)/*父进程从接收管道中读取数据*/
err_sys("read error from pipe");
if (n == 0) {
err_msg("child closed pipe");
break;
}
line[n] = 0; /* null terminate */
if (fputs(line, stdout) == EOF)/*父进程将从接收管道读取的数据写入到标准输出中*/
err_sys("fputs error");
}
if (ferror(stdin))
err_sys("fgets error on stdin");
exit(0);
} else { /* child */
close(fd1[1]);
close(fd2[0]);
if (fd1[0] != STDIN_FILENO) {
if (dup2(fd1[0], STDIN_FILENO) != STDIN_FILENO)/*子进程将标准输入重新定向到管道*/
err_sys("dup2 error to stdin");
close(fd1[0]);
}
if (fd2[1] != STDOUT_FILENO) {
if (dup2(fd2[1], STDOUT_FILENO) != STDOUT_FILENO)/*子进程将标准输出重新定向到管道*/
err_sys("dup2 error to stdout");
close(fd2[1]);
}
if (execl("./add2", "add2", (char *)0) < 0)/*子进程执行协作处理程序*/
err_sys("execl error");
}
exit(0);
}
这里,我们创建两个管道,然后父子进程关闭其无用的管道端。我们需要使用两个管道,一个用来作为协作处理程序的标准输入,一个用来做为协作处理程序的标准输出。子进程然后在启动协作处理程序之前调用dup2将管道描述符号重定向到标准输入和标准输出。
如果我们编译并且运行上面的程序,那么它会正常工作。但是,如果我们在等待输入期间杀掉了add2协作处理程序然后输入了两个数字,这个时候会产生写管道没有读取端的信号(导致信号处理程序被调用)。
之前说过并不是所有的系统的pipe函数都提供全双工的管道,后面我们给出了另外一个版本的例子,它使用一个单个的全双工管道代替两个半双工管道,这需要系统支持全双工管道。
例子:
在前面的add2协作处理程序中,我们使用了底层的I/O(unix的系统调用):read和write。如果我们使用标准I/O库重新实现这个协作处理程序会怎样呢?下面的代码给出了这样的版本:
int main(void)
{
int int1, int2;
char line[MAXLINE];
while (fgets(line, MAXLINE, stdin) != NULL) {
if (sscanf(line, "%d%d", &int1, &int2) == 2) {
if (printf("%d\n", int1 + int2) == EOF)
err_sys("printf error");
} else {
if (printf("invalid args\n") == EOF)
err_sys("printf error");
}
}
exit(0);
}
如果我们使用上面的这段代码作为协作处理程序,当启动协作处理程序的时候,就不会工作了。因为,默认来说标准I/O库是有缓存的。当这段程序被启动的时候,第一个fgets使用标准I/O库分配一个缓存,使用已有的缓存类型,然后从标准输入(这个标准输入实际被重新定向到管道上了)中读取。因为标准输入已经被重新定向成为了一个管道,默认来说标准I/O库函数的缓存是"fully buffered"(也就是缓存满了才会flush),类似标准输出也会有同样的动作。这样,add2程序从它的标准输入上读取就会阻塞,而调用popen的程序从它的管道中读取(也就是add2的标准输出)的时候也会阻塞,最终导致了死锁。
这里,我们可以控制运行的协作处理程序,我们可以在上面代码中的while循环之前加入如下的语句:
if (setvbuf(stdin, NULL, _IOLBF, 0) != 0)
err_sys("setvbuf error");
if (setvbuf(stdout, NULL, _IOLBF, 0) != 0)
err_sys("setvbuf error");
这些语句导致fgets变成行缓存的了(输入一个完整的一行也就是包含回车的行),同时printf会再遇到一个换行符号的时候刷新输出。
如果我们无法修改向管道进行输出的程序,那么就需要其他的技术了,例如,如果我们使用awk(一个文本处理工具)作为一个协作处理程序,用来替代这里的add2,那么只要写一个如下的脚本:
#! /bin/awk -f
{ print $1 + $2 }
这样这个程序也不会正常的工作,原因和前面那段代码一样,因为它使用的是标准I/O的缓存了。但是这里,我们无法改变awk的工作方式(也就是说,除非我们有awk的代码,否则我们无法修改awk的代码,设置其缓存类型)。这个时候我们可以在启动协作处理程序(awk)的时候让它以为自己的标准输入和输出是一个终端,这样会导致协作处理程序中的标准I/O库函数使用行缓存来处理输入/输出流。类似类似我们使用setvbuf,我们使用伪终端来做这个工作,后面会讲到。
参考: