全部博文(73)
分类: LINUX
2009-04-23 16:25:08
Pthread之多命令驱动线程模型
在多线程应用中,我们经常会碰到这种情况:
1.
父线程要求子线程执行不同的任务
2.
子线程执行的任务又都是长时间任务,需要要单独的线程里面执行
3.
子线程的任务并不需要都同时执行
4.
需要让一个线程动态添加执行的任务
这时,一般采用的方法有如下两种:
1.
由父线程为每一个任务创建一个线程,所有的线程都在等待状态,父线程发送信号唤醒必要的线程。(当然,如果你在必要的时候才创建线程,也没有不可以,但本质没有什么区别)
2.
由父线程创建一个线程,由这一个线程去执行不同的任务
3.
让程序能够动态的添加自己执行的任务、可以用改变函数指印的方法来完成。这种方法的要求实则是建一个代理函数,这个函数封装了对底层任务的修改。
现面我们主要介绍一下第二种思想在一个简单应用,先看看目录结构:
dongq@DongQ_Lap ~/workspace/test $ tree multicommand
multicommand
|-- Makefile
|-- childthread.h --子程序,用来处理任务
|-- childthread.c --子程序用来处理任务的实现
`-- multicommand.c --主程序,用来发送命令,显示结果
0 directories, 3 files
首先看看头文件:
#include
#include
#include
#include
#include
//
#define SCAN 1
#define INSERT 2
#define LOOKUP 3
//定义线程的状态
#define RUNING 0
#define WAITING 1
#define EXIT 2
typedef struct __command
{
int command;
int sleepTime;
int *data;
} Command; //用来传递给线程,用来指定不同的动作,以及操作的数据
typedef struct __thread_struct
{
int status;
pthread_mutex_t mutex;
pthread_cond_t cond;
void (* pthr_init)( void );
void (* pthr_wake)( void );
void (* pthr_exit)( void );
void *(* pthr_func)( void *);
} Thread_struct; //用来对线程的基本属性进行描述
Thread_struct test_thread;
Command request;
int data1[ 10 ];
int data2[ 10 ];
int data3[ 10 ];
void pthr_register( void ); //用于注册线程的基本属性的注册
再来看看主程序,它支持SCAN,INSERT,LOOKUP,即读、写、查三种要主功能,每种功能对应
一个数据集合用来表示数据的变化 。
#include "childthread.h"
int main ( int argc, char *argv[] )
{
//数据的初始化
pthread_t thread;
pthread_attr_t thread_attr;
char input = ' ';
int j = 0;
memset( data1, 0, sizeof( data1 )
);
memset( data2, 0, sizeof( data2 )
);
memset( data3, 0, sizeof( data3 )
);
//对线程的属性进行初始化
test_thread.status = EXIT;
if( pthread_mutex_init(
&test_thread.mutex, NULL) )
{
printf( "Mutex
initialization failure!!!\n" );
}
if ( pthread_cond_init(
&test_thread.cond, NULL ))
{
printf( "Condition
variable initialization failure!!!\n" );
}
if ( pthread_attr_init(
&thread_attr ) )
{
printf( "Thread
Attribute initialization failure!!!\n" );
}
if ( pthread_attr_setdetachstate(
&thread_attr, PTHREAD_CREATE_DETACHED ) )
{
printf( "Set thread
detach state failure!!!\n" );
}
pthr_register(); //值得注意的地方:) 进行函数的主删
//创建子函数
if ( pthread_create ( &thread,
NULL, test_thread.pthr_func, NULL ) )
{
printf( "Create child
thread error!!!!\n" );
}
//线程初化
test_thread.pthr_init();
printf( "Please input
command:" );
input = getchar();
//从用户处得到指令,设置线程詷度的数据,并唤回线程
while( 1 )
{
if ( test_thread.status !=
RUNING )
{
switch ( input )
{
case 's':
printf(
"Send SCAN signal to child thread\n" );
request.command
= SCAN;
request.sleepTime
= 3 ;
request.data = data1;
break;
case 'i':
printf(
"Send INSERT signal to child thread\n" );
request.command
= INSERT;
request.sleepTime
= 2 ;
request.data
= data2;
break;
case 'l':
printf(
"Send LOOKUP signal to child thread\n" );
request.command
= LOOKUP;
request.sleepTime
= 4 ;
request.data
= data3;
break;
case 10:
break;
case 'q':
test_thread.pthr_exit();
break;
default:
printf(
"Please input 's', 'i', 'l', 'q' command!!!\n" );
break;
}
if ( input == 'q' )
{
break;
}
//唤醒线程
test_thread.pthr_wake();
fsync( 1 );
}
//打印执行结果
while ( test_thread.status
== WAITING )
{
for ( j = 0; j <
10 ; j++ )
{
printf(
"Request.data[ %d ] = %d \n", j, request.data[ j ] );
}
printf(
"Please input command:" );
input = getchar();
break;
}
}
return 0;
}
最后,我们来看看最最重要的childthread.c,看看到底,一切是怎么样结合在一起的:
#include "childthread.h"
//通过封装,可以执行一些代码,以满足自己的一些特定要求
void pthr_init( void )
{
printf( "Check running
envriment.\n" );
}
void pthr_wake( void )
{
pthread_cond_signal(
&test_thread.cond );
}
void pthr_exit( void )
{
printf( "Check depend thread
and so on.\n" );
}
//用于处理不同的任务
void run_command( int action, int sleepTime )
{
int i = 0;
sleep( sleepTime );
printf( "Run %d\n",
action );
request.command = 0;
for ( i = 0; i < 10; i++ )
{
request.data[ i ] = action;
}
return;
}
//用于命令的分发
void *dispatchCommand_thread (void *
arg)
{
printf( "Running Child
Thread.\n" );
while( 1 )
{
test_thread.status =
WAITING;
pthread_mutex_lock(
&test_thread.mutex );
pthread_cond_wait(
&test_thread.cond, &test_thread.mutex );
pthread_mutex_unlock(
&test_thread.mutex );
test_thread.status =
RUNING;
printf( "Dispatch
command and deal with.\n" );
while( 1 )
{
if( request.command
!= 0 )
{
switch(
request.command )
{
case
SCAN:
run_command(
request.command, request.sleepTime );
break;
case
INSERT:
run_command(
request.command, request.sleepTime );
break;
case
LOOKUP:
run_command(
request.command, request.sleepTime );
break;
default:
printf(
"Command error!!\n" );
break;
}
}
else
{
break;
}
}
}
test_thread.status = EXIT;
pthread_exit( NULL );
}
//用于注册子线程的函数
void pthr_register( void )
{
test_thread.pthr_init = pthr_init;
test_thread.pthr_wake = pthr_wake;
test_thread.pthr_exit = pthr_exit;
test_thread.pthr_func =
dispatchCommand_thread;
}
上面的代码,只保证在正常的状态下,正常的对自己关注的线程使用方式进行演示,不保证在非法操作中能够恢复:)