生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的
缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入
一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费
者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。
听起来好像蛮对的,无懈可击似的,但其实在实
现时会有一个竞争条件存在的。为了跟踪缓冲区中的消息数目,需要一个变量 count。如果缓冲区最多存放 N 个消息,则生产者的代码会首先检查
count 是否达到 N,如果是,则生产者休眠;否则,生产者向缓冲区中放入一个消息,并增加 count 的值。
消费者的代码也与此类似,首先检测 count 是否为 0,如果是,则休眠;否则,从缓冲区中取出消息并递减 count 的值。同时,每个进程也需要检查是否需要唤醒另一个进程。代码可能如下:
// 缓冲区大小
#define N 100
int count = 0; // 跟踪缓冲区的记录数
/* 生产者进程 */
void procedure(void)
{
int item; // 缓冲区中的数据项
while(true) // 无限循环
{
item = produce_item(); // 产生下一个数据项
if (count == N) // 如果缓冲区满了,进行休眠
{
sleep();
}
insert_item(item); // 将新数据项放入缓冲区
count = count + 1; // 计数器加 1
if (count == 1) // 表明插入之前为空,
{ // 消费者等待
wakeup(consumer); // 唤醒消费者
}
}
}
/* 消费者进程 */
void consumer(void)
{
int item; // 缓冲区中的数据项
while(true) // 无限循环
{
if (count == 0) // 如果缓冲区为空,进入休眠
{
sleep();
}
item = remove_item(); // 从缓冲区中取出一个数据项
count = count - 1; // 计数器减 1
if (count == N -1) // 缓冲区有空槽
{ // 唤醒生产者
wakeup(producer);
}
consume_item(item); // 打印出数据项
}
}
看
上去很美,哪里出了问题,这里对 count 的访问是有可能出现竞争条件的:缓冲区为空,消费者刚刚读取 count 的值为
0,而此时调度程序决定暂停消费者并启动执行生产者。生产者向缓冲区中加入一个数据项,count 加 1。现在 count 的值变成了
1.它推断刚才 count 为 0,所以此时消费者一定在休眠,于是生产者开始调用 wakeup(consumer)
来唤醒消费者。但是,此时消费者在逻辑上并没有休眠,所以 wakeup 信号就丢失了。当消费者下次运行时,它将测试先前读到的 count
值,发现为 0(注意,其实这个时刻 count 已经为 1 了),于是开始休眠(逻辑上)。而生产者下次运行的时候,count
会继续递增,并且不会唤醒 consumer 了,所以迟早会填满缓冲区的,然后生产者也休眠,这样两个进程就都永远的休眠下去了。
1,使用信号量解决生产者-消费者问题
首
先了解一下信号量吧,信号量是 E.W.Dijkstra 在 1965
年提出的一种方法,它是使用一个整型变量来累计唤醒的次数,供以后使用。在他的建议中,引入了一个新的变量类型,称为信号量(semaphore),一个
信号量的取值可以为 0(表示没有保存下来的唤醒操作)或者为正值(表示有一个或多个唤醒操作)。
并且设立了两种操作:down 和
up(分别为一般化后的 sleep 和 wakeup,其实也是一般教科书上说的 P/V 向量)。对一个信号量执行 down
操作,表示检查其值是否大于 0,如果该值大于 0,则将其值减 1(即用掉一个保存的唤醒信号)并继续;如果为 0,则进程休眠,而且此时 down
操作并未结束。另外,就是检查数值,修改变量值以及可能发生的休眠操作都作为单一的,不可分割的 原子操作 来完成。
下面开始考虑用信号量来解决生产者-消费者问题了,不过在此之前,再次分析一下这个问题的本质会更清晰点:问题的实质在于发给一个(尚)未休眠进程(如上的消费者进程在只判断了 count == 0 后即被调度出来,还未休眠)的 wakeup 信号丢失(如上的生产者进程在判断了 count == 1 后以为消费者进程休眠,而唤醒它)了。如果它没有丢失,则一切都会很好。
#define N 100 // 缓冲区中的槽数目
typedef int semaphore; // 信号量一般被定义为特殊的整型数据
semaphore mutex = 1; // 控制对临界区的访问
semaphore empty = N; // 计数缓冲区中的空槽数目
semaphore full = 0; // 计数缓冲区中的满槽数目
/* 生产者进程 */
void proceducer(void)
{
int item;
while(1)
{
item = procedure_item(); // 生成数据
down(&empty); // 将空槽数目减 1
down(&mutex); // 进入临界区
insert_item(item); // 将新数据放入缓冲区
up(&mutex); // 离开临界区
up(&full); // 将满槽的数目加 1
}
}
/* 消费者进程 */
void consumer(voi)
{
int item;
while(1)
{
down(&full); // 将满槽数目减 1
down(&mutex); // 进入临界区
item = remove_item(); // 从缓冲区中取出数据项
up(&mutex); // 离开临界区
up(&empty); // 将空槽数目加 1
consumer_item(item); // 处理数据项
}
}
该
解决方案使用了三个信号量:一个为 full,用来记录充满的缓冲槽的数目,一个为 empty,记录空的缓冲槽总数,一个为
mutex,用来确保生产者和消费者不会同时访问缓冲区。mutex 的初始值为
1,供两个或者多个进程使用的信号量,保证同一个时刻只有一个进程可以进入临界区,称为二元信号量(binary
semaphore)。如果每一个进程在进入临界区前都执行一个 down(...),在刚刚退出临界区时执行一个 up(...),就能够实现互斥。
另外,通常是将 down 和 up 操作作为系统调用来实现,而且 OS 只需要在执行以下操作时暂时禁止全部中断:测试信号量,更新信号量以及在需要时使某个进程休眠。
这里使用了三个信号量,但是它们的目的却不相同,其中 full 和 empty 用来同步(synchronization),而 mutex 用来实现互斥。
2,使用消息传递解决生产者-消费者问题
这种 IPC 方式使用两条原语 send 和 receive,也是系统调用。如:
send(dest, &msg) // 将消息 msg 发送到目标(进程)dest 中
receive(src, &msg) // 接收由 src 过来的 msg,如果没有消息可用,则可能阻塞接收者
消息传递系统会面临位于网络中不同机器上的通信进程的情形,所以会更加的复杂。如:消息可能被网络丢失,一般使用确认(ACK)消息。如果发送方在一定的时间段内没有收到确认消息,则重发消息。
如果消息本身被正确接收,但是返回的 ACK 消息丢失,发送方则重发消息,这样接收方就会收到两份同样的消息。一般使用在每条原始消息的头部嵌入一个连续的序号来解决这个问题。
另外,消息传递系统还需要解决进程命名的问题,在 send 和 receive 系统调用中指定的进程必须没有二义性的。还有其他的一些问题,如性能问题,身份认证等等,不过那个就会扯多了,还是看看如果解决这个生产者-消费者的问题吧:
#define N 100 // 缓冲区中的槽数目
/* 生产者进程 */
void proceducer(void)
{
int item;
message msg; // 消息缓冲区
while(1)
{
item = procedure_item(); // 生成数据
receive(consumer, &msg); // 等待消费者发送空的缓冲区
build_msg(&msg, item); // 创建待发送消息
send(consumer, &msg); // 发送数据项给消费者
}
}
/* 消费者进程 */
void consumer(voi)
{
int item, i;
message msg;
for(i=0; i send(producer, &msg); // 发送给生产者 N 个空缓冲区
while(1)
{
receive(producer, &msg); // 接收包含数据项的消息
item = extract_item(&msg); // 解析消息,并组装成数据项
send(proceduer, &msg); // 然后又将空缓冲区发送回生产者
consumer_item(item); // 处理数据项
}
}
在
这个解决方案中,共使用了 N 条消息,有点类似于上一个的共享内存缓冲区的 N 个槽,消费者进程这边首先通过一个 for 循环将
N条空消息发送给生产者。当生产者向消费者传递一个数据项时,是通过取走每一条接收到的空消息,然后送回填充了内容的消息给消费者的。通过这种方式,整个
消息传递系统中的总的消息数(包括空的消息 + 存了数据项的消息 == N)是不变的。
如果运行过程中,生产者进程的速度比消费者快,则所有的消息最终都会塞满,然后生产者进程就会等待消费者(即使调用 procedure 也是阻塞在 receive 处),直到消费者返回一条空的消息;反之亦然。
下面再来看一下消息传递方式的两种变体。一种是:为每一个进程分配一个唯一的地址,
让消息按照这个进程的地址进行编址。也就是 send 和 receive
调用的第一个参数指定为具体的进程地址。另一种是:引入信箱(mailbox,现在正在做的一个项目,就是这种方式),可以信箱就像一个盒子,里面装了很
多的信件,这个信件就是我们要传递的消息,当然信箱是有容量限制的(现在 yahoo 好像推出无限容量的,呵呵)。当使用信箱时,send 和
receive
系统调用中的地址参数就是信箱的地址,而不是进程的地址。当一个进程尝试向一个容量爆满的信箱发送消息时,它将会被挂起,直到信箱中有消息被取走。
阅读(1793) | 评论(0) | 转发(0) |