基于嵌入式操作系统VxWorks的多任务并发程序设计(4)
――任务间通信
作者:宋宝华 e-mail:21cnbao@21cn.com 出处:软件报
VxWorks提供了多种任务间通信方式,主要有:
(1)共享内存(Shared Memory),用于简单的数据共享;
由于VxWorks的所有任务存在于单一的线性地址空间,所以任务间可共享数据,全局变量、线性队列、环形队列、链表、指针都可被具有不同上下文的任务访问。
(2)信号量(Semaphore),用于互斥和同步;
信号量是VxWorks所提供的最快速的任务间通信机制,它主要用于解决任务间共享资源的互斥访问和同步。针对问题的不同类型,VxWorks提供了三种信号量:二进制(binary)信号量、互斥(mutual exclusion)信号量、计数(counting)信号量。二进制信号量和互斥信号量只能处于0和1两种状态,可以看作计数值为1的计数信号量。二进制信号量可用于同步与互斥,而互斥信号量只能用于互斥,但其支持优先级继承机制。
(3)消息队列(Message queues)和管道(Pipe),单CPU内任务间的信息传送;
消息机制使用一个被各有关进程共享的消息队列,任务之间经由这个消息队列发送和接收消息。管道则是受驱动器pipeDrv管理的虚拟I/O设备,它提供了一种灵活的消息传送机制。任务能调用标准的I/O函数打开、读出、写入管道。
(4)套接字(Socket)和远程过程调用(RPC),用于网络间任务消息传送;
与其它操作系统一样,Vxworks和网络协议的接口靠套接字来实现,套接字可实现运行在VxWorks系统或其它系统之间任务的信息传送。远程过程调用允许任务调用另一运行VxWorks或其它系统的主机上的过程。
(5)信号(Signals),用于异常处理(Exception handling)。
7.任务间通信
7.1 信号量
VxWorks主要提供如下API进行信号量的创建、获取和释放:
(1)semBCreate( ):分配并初始化一个二进制信号量,函数原型为:
SEM_ID semBCreate
(
int options, /*信号量选项*/
SEM_B_STATE initialState /*信号量初始化状态值*/
) ;
(2)semMCreate( ):分配并初始化一个互斥信号量,函数原型为:
SEM_ID semBCreate
(
int options, /*信号量选项*/
SEM_B_STATE initialState /*信号量初始化状态值*/
);
(3)semCCreate( ):分配并初始化一个计数信号量,函数原型为:
SEM_ID semCCreate
(
int options, /*信号量选项*/
int initialCount /*信号量初始计数值*/
) ;
当一个信号量被创建时,它的队列(queue)类型需要被确定。等待信号量的任务队列可以以优先级顺序 (SEM_Q_PRIORITY)或者先到先得方式(SEM_Q_FIFO)排列。
(4)semDelete( ):删除一个自由的信号量,函数原型为:
STATUS semDelete
(
SEM_ID semId /*要删除的信号量ID号*/
);
(5)semTake( ):占有一个信号量,函数原型为:
STATUS semTake
(
SEM_ID semId /*所要得到的信号量ID号*/
int timeout /*等待时间*/
);
(6)semGive( ):释放一个信号量,函数原型为:
STATUS semGive
(
SEM_ID semId /*所给出的信号量ID号*/
);
(7)semFlush( ):解锁所有等待信号量的任务,函数原型为:
STATUS semFlush
(
SEM_ID semId /*要解锁的信号量ID号*/
);
7.1.1 二进制信号量
例1:二进制信号量
/* includes */
#include "vxWorks.h"
#include "taskLib.h"
#include "semLib.h"
#include "stdio.h"
/* function prototypes */
void taskOne(void);
void taskTwo(void);
/* globals */
#define ITER 10
SEM_ID semBinary;
int global = 0;
void binary(void)
{
int taskIdOne, taskIdTwo;
/* create semaphore with semaphore available and queue tasks on FIFO basis */
semBinary = semBCreate(SEM_Q_FIFO, SEM_FULL);
/* Note 1: lock the semaphore for scheduling purposes */
semTake(semBinary, WAIT_FOREVER);
/* spawn the two tasks */
taskIdOne = taskSpawn("t1", 90, 0x100, 2000, (FUNCPTR)taskOne, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
taskIdTwo = taskSpawn("t2", 90, 0x100, 2000, (FUNCPTR)taskTwo, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
}
void taskOne(void)
{
int i;
for (i = 0; i < ITER; i++)
{
semTake(semBinary, WAIT_FOREVER); /* wait indefinitely for semaphore */
printf("I am taskOne and global = %d......................\n", ++global);
semGive(semBinary); /* give up semaphore */
}
}
void taskTwo(void)
{
int i;
semGive(semBinary); /* Note 2: give up semaphore(a scheduling fix) */
for (i = 0; i < ITER; i++)
{
semTake(semBinary, WAIT_FOREVER); /* wait indefinitely for semaphore */
printf("I am taskTwo and global = %d----------------------\n", --global);
semGive (semBinary); /* give up semaphore */
}
}
上述程序通过semTake和semGive保护每次使用printf输出一串信息时不被间断。
要创建一个发挥互斥作用的二进制信号量一般使用semBCreat(xxx, SEM_FULL)调用,其中的SEM_FULL暗示该信号量用于任务间的互斥(最开始二进制信号量可获得)。对临界区域(critical region)的访问需以semTake和semGive加以保护:
semTake (semMutex, WAIT_FOREVER);
. . /*critical region, only accessible by a single task at a time*/
semGive (semMutex);
要创建一个发挥同步作用的二进制信号量一般使用semBCreat(xxx, SEM_EMPTY) 调用,其中的SEM_EMPTY 暗示该信号量用于任务间的同步(即最开始二进制信号量不可获得)。
二进制信号量使用最广泛的一种情况是中断与任务间通信。中断服务程序一般以二进制信号量“通知”对应的任务进行中断后的处理工作,例如:
SEM_ID syncSem;/* ID of sync semaphore */
myTask(void)
{
semTake(syncSem, WAIT_FOREVER); /* wait for event to occur */
printf("my Task got the semaphore\n");
... /* process event */
}
eventInterruptSvcRout(void)
{
semGive(syncSem); /* let my Task process event */
...
}
7.1.2 互斥信号量
互斥信号量可以看作一种特殊的二进制信号量,其支持普通二进制信号量不支持的一些特性,提供优先级继承、安全删除和回归能力。互斥信号量的使用方法和二进制信号量基本类似,但有如下区别:
(1)仅仅被用做互斥,不能提供同步机制;
(2)只能被使用它的任务释放;
(3)中断服务程序(ISR)不能释放它;
(4)不能使用函数semFlush( );
(5)支持使用二进制信号量进行互斥时所不支持的优先级“翻转”。
任务的优先级翻转是指高优先级任务因等待低优先级任务占用的互斥资源而被较低优先级(高于低优先级但低于高优先级)的任务不断抢占的情况。VxWorks操作系统提供优先级继承机制对优先级翻转进行预防。占用互斥资源但优先级较低的任务被暂时地提高到等待该资源的最高优先级任务的优先级。这样,中等优先级的任务将无法抢占原本低优先级的任务,使得低优先级任务能尽快执行,释放出优先级较高的任务所需要的资源。
为了使互斥信号量支持优先级继承支持,我们在调用semMCreate时应使用SEM_Q_PRIORITY和SEM_INVERSION_SAFE选项。互斥信号量提供互斥也需要对临界区域进行保护:
semTake (semMutex, WAIT_FOREVER);
. . //critical region, only accessible by a single task at a time .
semGive (semMutex);
8.消息队列和管道
VxWorks主要提供如下API进行消息队列的创建、读取和传递:
msgQCreate( ):创建斌初始化一个消息队列,函数原型为:
MSG_Q_ID msgQCreate
(
int maxMsgs, /*队列所能容纳的最大消息数目*/
int maxMsgLength, /*每一消息的最大长度*/
int options /*消息入列方式*/
);
msgQDelete( ):终止并释放一个消息队列,函数原型为:
STATUS msgQDelete
(
MSG_Q_ID msgQId /*要删除的消息队列ID号*/
);
msgQSend( ):发送一个消息到消息队列,函数原型为:
STATUS msgQSend
(
MSG_Q_ID msgQId, /*所发向的消息队列名*/
char * buffer, /*消息包所在缓冲区指针*/
UINT nBytes, /*消息包长度*/
int timeout, /*等待的时间长度*/
int priority /*优先级*/
);
msgQReceive( ):从消息队列接受一个消息,函数原型为:
int msgQReceive
(
MSG_Q_ID msgQId, /*接收消息的消息队列ID号*/
char * buffer, /*接收消息的缓冲区指针*/
UINT maxNBytes, /*缓冲区长度*/
int timeout /*等待时间*/
);
下面我们以著名的生产者/消费者问题作为例子来说明消息队列的用途:
例2:消息队列
/* includes */
#include "vxWorks.h"
#include "taskLib.h"
#include "msgQLib.h"
#include "sysLib.h"
#include "stdio.h"
/* globals */
#define CONSUMER_TASK_PRI 99 /* Priority of the consumer task */
#define PRODUCER_TASK_PRI 98 /* Priority of the producer task */
#define TASK_STACK_SIZE 5000 /* stack size for spawned tasks */
struct msg { /* data structure for msg passing */
int tid; /* task id */
int value; /* msg value */
};
LOCAL MSG_Q_ID msgQId; /* message queue id */
LOCAL int numMsg = 8; /* number of messages */
LOCAL BOOL notDone; /* Flag to indicate the completion */
/* function prototypes */
LOCAL void producerTask(); /* producer task */
LOCAL void consumerTask(); /* consumer task */
/* user entry */
void msgQDemo()
{
notDone = TRUE; /* initialize the global flag */
/* Create the message queue*/
if ((msgQId = msgQCreate(numMsg, sizeof(struct msg), MSG_Q_FIFO)) == NULL)
{
perror("Error in creating msgQ");
}
/* Spawn the producerTask task */
if (taskSpawn("tProducerTask", PRODUCER_TASK_PRI, 0, TASK_STACK_SIZE, (FUNCPTR)producerTask, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) == ERROR)
{
perror("producerTask: Error in spawning demoTask");
}
/* Spawn the consumerTask task */
if (taskSpawn("tConsumerTask", CONSUMER_TASK_PRI, 0, TASK_STACK_SIZE, (FUNCPTR)consumerTask, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) == ERROR)
{
perror("consumerTask: Error in spawning demoTask");
}
/* polling is not recommended. But used to make this demonstration simple*/
while (notDone)
taskDelay(sysClkRateGet());
if (msgQDelete(msgQId) == ERROR)
{
perror("Error in deleting msgQ");
}
}
/* producerTask :发送消息给consumerTask */
void producerTask(void)
{
int count;
int value;
struct msg producedItem; /* producer item - produced data */
printf("producerTask started: task id = %#x \n", taskIdSelf());
/* Produce numMsg number of messages and send these messages */
for (count = 1; count <= numMsg; count++)
{
value = count * 10; /* produce a value */
/* Fill in the data structure for message passing */
producedItem.tid = taskIdSelf();
producedItem.value = value;
/* Send Messages */
if ((msgQSend(msgQId, (char*) &producedItem, sizeof(producedItem),
WAIT_FOREVER, MSG_PRI_NORMAL)) == ERROR)
{
perror("Error in sending the message");
}
else
printf("ProducerTask: tid = %#x, produced value = %d \n", taskIdSelf(),
value);
}
}
/* consumerTask:获取(消费)消息 */
void consumerTask(void)
{
int count;
struct msg consumedItem; /* consumer item - consumed data */
printf("\n\nConsumerTask: Started - task id = %#x\n", taskIdSelf());
/* consume numMsg number of messages */
for (count = 1; count <= numMsg; count++)
{
/* Receive messages */
if ((msgQReceive(msgQId, (char*) &consumedItem, sizeof(consumedItem),
WAIT_FOREVER)) == ERROR)
{
perror("Error in receiving the message");
}
else
printf("ConsumerTask: Consuming msg of value %d from tid = %#x\n",
consumedItem.value, consumedItem.tid);
}
notDone = FALSE; /* set the global flag to FALSE to indicate completion*/
}
程序运行输出:
producerTask started: task id = 0x
ProducerTask: tid = 0x
ProducerTask: tid = 0x
ProducerTask: tid = 0x
ProducerTask: tid = 0x
ProducerTask: tid = 0x
ProducerTask: tid = 0x
ProducerTask: tid = 0x
ProducerTask: tid = 0x
ConsumerTask: Started - task id = 0x
ConsumerTask: Consuming msg of value 10 from tid = 0x
ConsumerTask: Consuming msg of value 20 from tid = 0x
ConsumerTask: Consuming msg of value 30 from tid = 0x
ConsumerTask: Consuming msg of value 40 from tid = 0x
ConsumerTask: Consuming msg of value 50 from tid = 0x
ConsumerTask: Consuming msg of value 60 from tid = 0x
ConsumerTask: Consuming msg of value 70 from tid = 0x
ConsumerTask: Consuming msg of value 80 from tid = 0x
我们以WINDVIEW捕获上述程序运行时的状态,
从程序运行的输出结果和上图可以看出,生产者“生产”了numMsg个(即8个)消息,消费者也“消费”了numMsg个消息。
Trackback: