Chinaunix首页 | 论坛 | 博客
  • 博客访问: 7888796
  • 博文数量: 701
  • 博客积分: 2150
  • 博客等级: 上尉
  • 技术积分: 13233
  • 用 户 组: 普通用户
  • 注册时间: 2011-06-29 16:28
个人简介

天行健,君子以自强不息!

文章分类

全部博文(701)

文章存档

2019年(2)

2018年(12)

2017年(76)

2016年(120)

2015年(178)

2014年(129)

2013年(123)

2012年(61)

分类: 架构设计与优化

2013-04-27 21:34:24

一、POE简介
perl的POE模块是一个事件驱动的有限状态机编程框架,
它用一个单进程模拟多线程的多任务执行模块,
并提供的非阻塞的IO操作(就像C语言下面的libevent库)。
它能干的事情很多,并且写起来代码很清晰,可读性非常好。
POE最重要的一点是,它尝试着把在事件驱动环境中编程的恼人细节给隐藏起来。


POE的组成部分包括:
states(状态),kernel(内核),session(任务),driver(驱动),filter,wheel和component等。
states: 就是有限状态机中的状态,
        对应着的就是处理这些状态的句柄--也就是函数。


kernel: 是POE的核心模块.
        POE的内核与OS的内核很相似: 它会跟踪后台的所有进程和数据,并调度运行的代码。
        可以用内核来设置POE进程的时钟,将要运行的状态进行排队,和执行各种低级服务,
        但在很多时候,并不需要直接和内核进行交互。


session:它等同于OS中的进程。
        session是从一个状态切换到另一个状态时运行的POE程序。
        它可以创建子session, 发送POE事件给其它session等。


        每个session都有一个名为head的哈希表,可以用来存储本session的私有数据,
        并且该session中的每个状态处理函数都可访问它。


        POE是一个很精简的多任务协作模型;
        所有的session都是在同个OS进程中执行,其本身上不是多线程或多进程的。
        因此,在POE编程中,尽量不要使用阻塞式的系统调用,否则会导致整个POE运行的阻塞。


driver: 是POE的I/O层的最低层。
        现在,POE的发布版本中只有一个driver -- POE::Driver::SysRW.
        它可以从文件句柄中读写数据.


filter: 它是将格式化的数据块转换成另一种格式的接口。
        例如,POE::Filter::HTTPD将HTTP 1.0的请求转换成HTTP::Request对象并返回;
        POE::Filter::Line将原始数据流转换成多行(就像Perl的<>操作)。


wheel:  它是处理任务的高级逻辑的可重用代码片断。
        它们以POE的方式将有用的代码做了封装。
        在POE中,通常使用wheel做的事情包括处理事件驱动的输入输出,易用的网络连接。
        Wheel通常使用Filter和Driver来通知和发送数据。


component: 它就是一个封装好了的可重用的session,可以被别的session控制。
        其它的session可以发送命令给它,也可以从它那接收事件,很像OS中的使用IPC进行进程通信。
        Component的例子包括,
        POE::Component::IRC, 用来创建基于POE的IRC客户端的接口;
        POE::Component::HTTP, Perl中由事件驱动的用户代码。


二、POE::Wheel简介
POE::Wheel是封装好了的用来执行特定任务的事件句柄包,
它负责管理激活对应事件处理句柄的事件监测器。
在POE::Wheel创建时,它会添加匿名的事件句柄给调用它的session。
因此,当session创建wheel时,就需要处理这些新的事件。


常用的Wheel有如下:
POE::Wheel::Curses        - Non-blocking input for Curses.
POE::Wheel::FollowTail    - Non-blocking file and FIFO monitoring.
POE::Wheel::ListenAccept  - Non-blocking server for existing sockets.
POE::Wheel::ReadLine      - Non-blocking console input, with full readline support.
POE::Wheel::ReadWrite     - Non-blocking stream I/O.
POE::Wheel::Run           - Non-blocking process creation and management.
POE::Wheel::SocketFactory - Non-blocking socket creation, supporting most protocols and modes.


三、POE::Wheel::Run简介
是一个非阻塞的进程创建和管理器。
因为POE本身不是多线程或多进程的,所以要创建子进程以并行处理时就要用到它啦。
父进程和子进程可以通过子进程的文件句柄STDIN, STDOUT和STDERR进行通信。


在父进程中,用POE::Wheel::Run对象表示子进程,
可以使用这个对象的PID()和kill()方法来查询和管理子进程。


父进程使用put()方法发送数据到子进程STDIN,
子进程的STDOUT和STDERR将会以事件方式发送给父进程。


POE::Wheel::Run对象同样可以在子进程关闭其输出文件句柄时通知父进程。
但父进程监视子进程退出更可靠的方式是使用:
 POE::Kernel的sig_child();
它会等待wheel的子进程退出并回收。
而且最好在所有的环境下都要用sig_child(),
不然,POE不会回收子进程。从而导致进程的泄漏,对于长时运行的程序来说,这是致命的。
对于这种情况,POE::Kernel在退出时会有warning提示。


POE::Wheel::Run的对象和子进程的通信,默认地是基于行的。
在程序开发时,可以通过使用POE::Filter对象来覆盖
"StdinFilter","StdoutFilter","StdioFilter"和"StderrFilter"事件句柄,
实现更多样的数据处理方式。


四、示例程序
本例是一很简单的示例程序,主要是用来说明整个开发流程。
程序实现的是用子进程打开文件,读取三行,
并在父进程中显示出这三行的内容。
闲话少说,正式开始程序:


1. 主程序
1  #!/usr/bin/perl -w
2
3  ###############################################################################
4  # \File
5  #   test_poe_wheel_run.pl
6  # \Descript
7  #   test POE::Wheel::Run
8  # \Author
9  #   Hank
10 # \Created date
11 #   2013-4-25
12 ###############################################################################
13 use strict;
14 use POE qw(Wheel::Run
15     Filter::Reference);
16
17 my $filename = "test_file.ini";
18 my $line_num = 3;
19
20 our $task_pid = 1;
21 $SIG{TERM} = $SIG{INT} = sub {
22 kill KILL => $task_pid;
23 exit;
24 };
25
26 #
27 # Main process
28 #
29 POE::Session->create(
30   inline_states => {
31     _start      => \&start_task,
32     _stop       => \&handle_task_shutdown,
33     task_result => \&handle_task_result,
34     task_done   => \&handle_task_done,
35     task_debug  => \&handle_task_debug,
36     sig_child   => \&handle_sig_child,
37   },
38   args => [$filename, $line_num],
39 );
40
41 $poe_kernel->run();
42
43 exit;


Line14,15:
  导入要用的POE模块,qw()声明是Perl用来一次性加载多个POE模块的简写方式,
  它和下面的代码等同:
    use POE;
    use POE::Wheel::Run;
    use POE::Filter::Reference;


Line17,18:
  定义了要读取的文件和行数,这么写是为了示例如何进行参数传递。


Line20~24:
  这段代码在POE::Wheel::Run开发中很有必要,它的作用是在整个POE进程用
  Ctrl+c或退出时kill掉子进程。
  否则会造成子进程失去父亲,被系统的init接管。


Line29:
  POE的session创建函数。


Line30~37:
  向POE注册本程序要处理的状态,以及这些状态对应的状态处理句柄。
  以"_"开始的"_start","_stop"是POE::Kernel默认的状态,分别用于session的启动和销毁。
  sig_child注册的POE::Kernel对子进程退出后,对其回收的状态处理句柄。
  其它三个状态都是自定义状态及其处理句柄。


Line38:
  是将参数传递给session的启动状态对应的处理函数start_task().
  POE使用了一个传递参数的特别方式:
  它将数组@_封装了很多额外的参数 -- 分别是:
     当前内核, session的引用, 状态名, Heap的引用,以及ARG0 ~ ARG9;
  要访问它们,可以使用数组@_加下标KERNEL, SESSION, STATE, HEAP, ARG0 ~ ARG9。
  这样的设计是为了最大化的提高运行速度。
  所以,在POE中的参数或数据在状态处理函数间的传递有两种方式:
    HEAP哈希,或ARG0~ARG9.
  Line38用的就是后一种,[$filename, $line_num]对应赋值到ARG0和ARG1。


Line41~43:
  启动POE::Kernel,由此便建立了一个用来探测并分派事件的主循环。整个程序就运转起来了。
  且run方法只有在所有session返回之后才会停止循环。
  之后,我们调用一个表示程序结束的提示符的exit系统方法来表示程序被终止。


  POE轮次处理每一个事件,每次只有一个事件句柄被运行。
  当事件句柄运行的时候,POE::Kernel自身也将被中断,在事件句柄返回之前,没有事件被分派。
  当各个session的事件被传送到主程序事件队列后,位于队列头部的事件被首先处理,
  新来的事件将被放置在队列的尾部。以此保证队列的轮次处理。
  POE::Kernek的run方法在最后一个session停止之后返回。


整个主程序的执行顺序是如下:
先创建session,
session创建完成后就发送第一个事件_start;
之后启动POE的kernel。
此时事件队列中已有一个_start事件,所以进入_start事件对应的状态处理函数start_task()。


2. _start状态处理函数
44 #
45 # sub-functions
46 #
47 sub start_task {
48   my ($kernel, $heap) = @_[KERNEL, HEAP];
49   my ($fname, $lnum)  = @_[ARG0..ARG1];
50   my @params = ($fname, $lnum); 
51
52   my $task = POE::Wheel::Run->new(
53     Program      => sub { task_stuff(@params) },
54     StdoutEvent  => "task_result",
55     StderrEvent  => "task_debug",
56     CloseEvent   => "task_done",
57   );
58
59   $kernel->sig_child($task->PID, "sig_child");
60   $task_pid = $task->PID;
61
62   # Wheel events include the wheel's ID.
63   $heap->{children_by_wid}{$task->ID} = $task;
64
65   # Signal events include the process ID.
66   $heap->{children_by_pid}{$task->PID} = $task;
67
68   print("Child pid ", $task->PID," started as wheel ", $task->ID, ".\n");
69 }
Line49:
  取得Line38经ARG0,ARG1传递的参数。


Line50:
  将变量打包到数组,以传给子进程。


Line52~57:
  创建POE::Wheel::Run对象,
  并将要处理的事件映射到session的state,以事件发生时触发相应的状态处理函数。
Line53:
  是指定子进程将要执行程序,子进程是以exec()方式运行.
  If Program holds a scalar, its value will be executed as exec($program). 
     Shell metacharacters are significant, per exec(SCALAR) semantics.
  If Program holds an array reference, it will executed as exec(@$program). 
     As per exec(ARRAY), shell metacharacters will not be significant.
  If Program holds a code reference, that code will be called in the child process  


Line59:
  指定进程退出时用来回收的事件。


Line62~66:
  将子进程的进程号和Wheel的ID以哈希的方式存储在HEAP中,以用于本session的其它状态处理函数。




3. _stop状态处理函数
71 sub handle_task_shutdown {
72   my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
73   my $task_id = $_[ARG0];
74
75   ## delete all wheels.
76   delete $heap->{wheel};
77
78   ## clear your alias
79   #$kernel->alias_remove($heap->{alias});
80
81   ## clear all alarms you might have set
82   #$kernel->alarm_remove_all();
83
84   return;
85 }
如代码所示,在session退出时进行各种资源的回收。


4. 子进程STDOUT,STDERR及进程结束事件触发的状态处理函数
87 sub handle_task_result {
88   my ($stdout_line, $wheel_id) = @_[ARG0, ARG1];
89   my $child = $_[HEAP]{children_by_wid}{$wheel_id};
90
91   print "pid ", $child->PID, " STDOUT: $stdout_line\n";
92 }
93
94 sub handle_task_debug {
95   my $result = $_[ARG0];
96 }
97
98 sub handle_task_done {
99   my ($kernel, $heap, $task_id) = @_[KERNEL, HEAP, ARG0];
100  delete $heap->{task}->{$task_id};
101}
Line87~92:
  StdoutEvent由子进程的打印到STDOUT触发。
  它带有两个参数:
    ARG0: 子进程写到STDOUT的信息;
    ARG1: 读取这个输出的Wheel的ID号;


Line94~96:
  StderrEvent和StdoutEvent的处理方式一样,在子进程写STDERR时触发。
  它带有两个参数:
    ARG0: 子进程写到STDERR的信息;
    ARG1: 读取这个输出的Wheel的ID号;


Line98~101:
  CloseEvent在子进程关闭它最后一次打开的文件句柄时触发,
  但它不是子进程结束时的信号。对于就种情况使用sig_child()来处理。
   
  在CloseEvent发生之后,就不可能再触发ErrorEvent或StdoutEvent。
  它带有一个参数:
    ARG0: Wheel的ID号,可以用来在一个session管理多个子进程时分开处理。


5. 子进程结束时session对子进程资源的回收
103sub handle_sig_child {
104  my ($heap, $sig, $pid, $exit_val) = @_[HEAP, ARG0, ARG1, ARG2];
105  my $child = delete $heap->{children_by_pid}{$pid};
106
107  return unless defined $child;
108  delete $heap->{children_by_wid}{$child->ID};
109  print "PID $$: Child-pid $pid exited\n";
110}
sig_child()是在特定子进程PROCESS_ID退出后,触发相应事件以进行处理的方便方式。
它带有多个参数:
my ($heap, $sig, $pid, $exit_val) = @_[HEAP, ARG0, ARG1, ARG2];


一个session可以注册多个sig_child()句柄,但是每个子进程对应的只有一个。
且它不会返回任何有意义的值。




6. 子进程任务执行程序体
112#
113# task instance
114#
115sub task_stuff {
116  my ($file, $line) = @_[0..1];
117
118  if ( !open(TASKFILE, "$file")){
119    print STDERR "Cann't open the file $file\n";
120    exit;
121  }
122
123  my $index = 0;
124  foreach my $task (){
125    print "$task";
126    $index++;
127    last if ($index == $line);
128  }
129  close (TASKFILE);
130}
这部分就和普通编程别无二致了:
获取参数,打开文件,读取三行,关闭文件。


只是Line119,line125不会在子进程中直接打印输出,
它们分别触发StdoutEvent 和StderrEvent事件,
从而调用相应的状态处理函数handle_task_result()和handle_task_debug()。


更多的事件和状态处理函数的解析可以看官方的文档:




《象》曰:天行健,君子以自强不息。
【白话】《象辞》说:天道运行周而复始,永无止息,谁也不能阻挡,君子应效法天道,自立自强,不停地奋斗下去。


阅读(2615) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~