并发执行 下面我们将要编写一个检测两个线程是否并发执行的程序。因为我们还没有了解要有效完成这一任务所需要的线程同步的知识,所以这并不是一个高效完成在线程之间称之为池操作的程序。再一起说明,我们要利用这一事实,在一个进程内部的不同线程之间共享除了局部函数变量之外的所有变量。
试验--两个线程的同步执行
在这一部分,我们所创建的程序thread2.c,是对thread1.c进行了简单的修改而得来的。我们添加一个额外的文件域变量来测试哪一个线程正在运行:
int run_now = 1;
当主函数执行时我们将其设置为1,而当我们的新线程执行时我们将其设置为2。
在main函数时,在新线程创建之后,我们添加下面的代码:
int print_count1 = 0;
while(print_count1++ < 20) {
if (run_now == 1) {
printf("1");
run_now = 2;
}
else {
sleep(1);
}
}
如果run_now为1,我们输出1并将其设置为2。否则我们会进行简短的休眠并再次检测。我们一次一次的检测直到值为1。这称之为忙等待(busy wait),尽管我们通过在每次检测之间休眠一秒来放慢速度。我们在本章的稍后部分会看到完成这一任务的更好方法。
在thread_function中,这是我们新线程执行的地址,我们会进行相同的操作,但是值是相反的。
int print_count2 = 0;
while(print_count2++ < 20) {
if (run_now == 2) {
printf("2");
run_now = 1;
}
else {
sleep(1);
}
}
我们同时移除了传递的参数与返回的值,因为我们不在对他们感兴趣。
当我们运行这个程序时,我们会看到下面的输出。(我们也许会发现程序需要几秒来产生输出)
$ cc -D_REENTRANT thread3.c -o thread2 -lpthread
$ ./thread2
12121212121212121212
Waiting for thread to finish...
Thread joined
工作原理 每一个线程通过设置run_now变量来通知其他的线程运行,然后等待其他线程改变变量的值。这演示了线程的执行是在两个线程之间自动传递的,并且再一次说明两个线程共享run_now变量。
同步
在前面的部分中,我们看到两个线程同时执行,但是我们在其中进行切换的方法是笨拙而低效的。幸运的是,有一个特别设计的函数集合可以为我们提供更好的方法来控制线程的执行与访问临界区代码。
我们将会了解两个基本的方法:信号量,其作用类似于一段代码周围的守门人;互斥,其作用类似于一个互斥排他的设置来保护代码段。这两个方法是相似的,确实,其中一个可以使用另一个方法来实现。然而,有一些情况问题的语义会建议使用其中的一个。例如,控制访问某段一次只能有一个线程访问的共享内存,最自然的解决方法就是使用互斥。然而,控制访问一个作为一个整体的相同对象的集合,例如将五个可用电话线集合中的一个指定给一个线程就更适合计数信号量的方法。选择哪种方法依赖于我们的喜好以及最适合我们程序的机制。
使用信号量同步 有两个接口函数用于信号量:一个来自POSIX实时扩展并且用于线程,而另一个就是所知的System V信号量,后者通常用于进程同步。(我们会在本章的稍后讨论第二种信号量)这两个信号量彼此之间并不能进行交互,而且尽管非常相似,他们却使用不同的函数调用。
在这一部分我们来了解一下最简单的信号量类型,其值只为0或1的二进制信号量。还有一个更为通用的信号量,使用更多值的计数信号量。通常,信号量用于保护一段代码,从而在任何时刻只有一个执行线程可以运行他。对于这样的任务就需要二进制信号量。偶尔我们希望允许一定数量的线程来执行一段指定的代码;对于这样的情况,我们可以使用一个计数信号量。因为计数信号量并不常用,我们在这里并不会进行深入的讨论,但是我们需要指出,计数信号量只是二进制信号量的一个逻辑扩展而所需的实际函数调用是相同的。
信号量函数并不以pthread_开头,如大多数线程特定函数那样,而是以sem_开头。有四个用于线程中的基本信号量函数。他们都非常简单。
一个信号量是用sem_init函数来创建的,其声明如下:
#include
int sem_init(sem_t *sem, int pshared, unsigned int value);
这个函数初始化一个由sem所指向的信号量对象,设置其共享选项,并且为其指定一个初始整数值。pshared参数控制信号量类型。如果pshared的值为0,那么这个信号量对于当前进程而言是局部的。否则,此信号量可以在进程之间共享。在这里我们所感兴趣的只是不能在进程之间共享的信号量。在编写本书时,Linux并不支持这种共享,而且当为pshared传递一个非零值时会使得调用失败。
下面一对函数控制信号量的值,其声明如下:
#include
int sem_wait(sem_t * sem);
int sem_post(sem_t * sem);
这两个函数都以指向sem_init调用所初始化的信号量对象的指针为参数。
sem_post函数会自动将信号量的值增加1。这里的自动意味着如果两个线程同时试着将一个信号量的值增加1,那么他们彼此之间并不会互相影响,例如,如果两个程序同时读取一个值,增加这个值,并将这个将写入一个文件时就发生这种情况。信号量总是会正确的将其值增加2,因为有两个线程在尝试修改他。
sem_wait函数会自动的将信号的值减1,但是这个函数总是首先等待直到此信号量具有一个非零计数。所以,如果我们在一个其值为2的信号量上调用 sem_wait函数,线程就会继续执行,但是信号量的值会减少为1。如果在其值为0的信号量上调用sem_wait函数,这个函数就会等待直到有其他的函数增加这个值,从而使得信号量的值不再为0。如果有两个线程同时在sem_wait内等待同一个信号量变为非零,而这个信号量的值是由第三进程来增加的,那么这两个等待线程中只有一个可以减少这个信号量并继续执行,而另一个会继续等待。
在一个函数内的原子"测试与设置"能力是使得信号量如此具有价值的原因。还有另一个信号量函数,sem_trywait,这是sem_wait函数的非阻塞模式。我们在这里并不会进行深入的讨论,我们可以在手册中了解更为详细的内容。
最后一个信号量函数就是sem_destroy。这个函数会在我们完成时清理信号量。其声明如下:
#include
int sem_destroy(sem_t * sem);
再一次说明,这个函数以一个指向信号量的指针为参数并且清理他所具有的任何资源。如果我们试着销毁一个某个线程正在等待的信号量时,我们就会得到一个错误。
与大多数Linux函数类似,这些函数会在成功时返回0。
试验--线程信号量
下面的代码,thread3.c,也是基于thread1.c。因为进行了大量的修改,所以在这里我们进行完整的展示。
#include
#include
#include
#include
#include
#include
void *thread_function(void *arg);
sem_t bin_sem;
#define WORK_SIZE 1024
char work_area[WORK_SIZE];
int main()
{
int res;
pthread_t a_thread;
void *thread_result;
res = sem_init(&bin_sem,0,0);
if(res != 0)
{
perror("Semaphore initialization failed");
exit(EXIT_FAILURE);
}
res = pthread_create(&a_thread,NULL,thread_function,NULL);
if(res != 0)
{
perror("Thread creation failed");
exit(EXIT_FAILURE);
}
printf("Input some text. Enter 'end' to finish\n");
while(strncmp("end",work_area,3) != 0)
{
fgets(work_area,WORK_SIZE,stdin);
sem_post(&bin_sem);
}
printf("\nWaiting for thread to finish...\n");
res = pthread_join(a_thread,&thread_result);
if(res != 0)
{
perror("Thread join failed");
exit(EXIT_FAILURE);
}
printf("Thread joined\n");
sem_destroy(&bin_sem);
exit(EXIT_SUCCESS);
}
void *thread_function(void *arg)
{
sem_wait(&bin_sem);
while(strncmp("end",work_area,3) != 0)
{
printf("You input %d characters\n",strlen(work_area)-1);
sem_wait(&bin_sem);
}
pthread_exit(NULL);
}
第一个重要的修改就是包含了semaphore.h使得我们可以访问信号量函数。然而我们在创建新线程之前声明一个信号量以及一些变量并且初始化信号量。
sem_t bin_sem;
#define WORK_SIZE 1024
char work_area[WORK_SIZE];
int main() {
int res;
pthread_t a_thread;
void *thread_result;
res = sem_init(&bin_sem, 0, 0);
if (res != 0) {
perror(“Semaphore initialization failed”);
exit(EXIT_FAILURE);
}
注意,在这里我们信号量的值初始化为0。
在函数main中,在我们启动新线程之后,我们由键盘读取一些文本,存入我们的工作区域,然后使用sem_post函数增加信号量。
printf(“Input some text. Enter ‘end’ to finish\n”);
while(strncmp(“end”, work_area, 3) != 0) {
fgets(work_area, WORK_SIZE, stdin);
sem_post(&bin_sem);
}
在新线程中,我们等待信号量然后计算输入的字符数。
sem_wait(&bin_sem);
while(strncmp(“end”, work_area, 3) != 0) {
printf(“You input %d characters\n”, strlen(work_area) -1);
sem_wait(&bin_sem);
}
当信号量被设置时,我们等待键盘输入。当我们具有一些输入时,我们释放这个信号量,允许第二个线程在第一个线程再次读取之前计算字符数。
再一次说明,两个线程共享同一个work_area数组。我们忽略了某些错误检测,例如sem_wait的返回值,从而使得代码更简要。然而在生产代码中我们应总是检测返回的错误代码,除非我们充分的理由忽略这些检测。
下面我们来运行我们的程序:
$ cc -D_REENTRANT -I/usr/include/nptl thread3.c –o thread3 -L/usr/lib/nptl -
lpthread
$./thread3
Input some text. Enter ‘end’ to finish
The Wasp Factory
You input 16 characters
Iain Banks
You input 10 characters
end
Waiting for thread to finish...
Thread joined
工作原理
当我们初始化信号量时,我们将其值设置为0。所以,当线程函数启动时,sem_wait调用会阻塞并且等待信号量变为非零。
在main线程中,我们等待直到我们有一些文本,然后使用sem_post函数增加信号量,这会立即使得另一个线程由sem_wait返回并且开始执行。一旦他完成了字符数的计算,他就会再次调用sem_wait并且阻塞直到main线程再次调用sem_post增加这个信号量。
很容易忽视引起细小错误的设计错误。让我们简单的修改这个程序,thread4a.c,来表明由键盘输入的文本有时会自动的被可用的文本替换。我们将main函数修改成如下的样子:
printf(“Input some text. Enter ‘end’ to finish\n”);
while(strncmp(“end”, work_area, 3) != 0) {
if (strncmp(work_area, “FAST”, 4) == 0) {
sem_post(&bin_sem);
strcpy(work_area, “Wheeee...”);
} else {
fgets(work_area, WORK_SIZE, stdin);
}
sem_post(&bin_sem);
}
现在如果我们输入FAST,程序会调用sem_post允许字符计数器运行,但是立即使用一些不同的内容来更新work_area。
$ cc -D_REENTRANT thread4a.c -o thread4a -lpthread
$ ./thread4a
Input some text. Enter ‘end’ to finish
Excession
You input 9 characters
FAST
You input 7 characters
You input 7 characters
You input 7 characters
end
Waiting for thread to finish...
Thread joined
问题就在于我们的程序依赖于文本输入在另一个线程在主线程准备好向其发送更多的单词进行计算之前有时间完成单词的统计。当我们试着快速连续的为其指定两个不同的单词集合进行计数时(由键盘输入FAST然后自动的替换为Wheee...),对于第二个线程而言并没有时间执行。然而,信号量已经被增加了多次,所以计数器线程会继续统计单词,并且减少信号量的值,直到他再次变为零。
这个例子显示了我们在多线程程序中需要小心的考虑时间。可以通过使用另外一个信号量来使得main线程等待直到计数线程有机会完成其计数来修正这个问题。