一.前言
至于POE的应用,我不想多说什么,因为需要使用状态机的地方太多。
举一个极端的例子,windows下的perl-tk对于多线程的支持极不稳定,如果在其中加入一个大数据量的处理应用,
结果往往会是一个无法动弹的程序。这时除了使用POE,也许没有更好的解决办法了。
另外,python中有叫twisted的类似框架,被广泛地应用在网络服务中,
具体的使用方法可以参考Oreilly出版的《Python Twisted Network Programming Essentials》。
某种意义上,也说明了状态机的重要性。
二.基本原理与概念
在详细说明POE服务器的建立步骤之前,需要对POE的原理做一个大致的了解。
在这个阶段中,我们将描述这方面的不同概念,以及它们在POE中的应用。
1. 事件与事件句柄
POE 是一个为网络工作和并行任务服务的事件驱动框架。
首先作为前提,需要掌握事件和事件驱动编程的意义。
在抽象意义上,事件就是真实世界中发生的一件事情。
比如说:早晨打铃、面包从烤机里弹出、茶煮好了等。而在用户界面上最常见的事件则是鼠标移动、按钮点击和键盘敲打等等。
具体到程序软件事件,则往往是一些抽象的事件。
也就是说,它不仅包括了发送给程序的外部活动,而且也包括了一些在操作系统内部运行的事件。
比如说,计时器到点了,socket建立了连接,下载完成等。
在事件驱动程序中,中心分配器的作用是将事件分配给不同的处理程序。
这些处理程序就是事件句柄,顾名思义,它们的任务就是处理相应事件。
POE的事件句柄之间的关系是合作性质的。
没有两个句柄会同时运行,每一个句柄在被激发运行期间将独占程序。
通过尽可能快地返回来保证程序的其它部分得以顺畅运行,这就是事件句柄之间的合作方式。
2. POE程序的组成部分
最简单的POE程序包括两个模块和一些用户代码:它们分别是POE::Kernel,POE::Session以及一些事件句柄。
a. POE::Kernel:
POE::Kernel提供了基于事件的操作系统核心服务。
包括I/O事件、警报和其它计时事件、信号事件和一些不被人意识到的事件。
POE::Kernel提供不同的方法对这些事件进行设置,比如select_read(), delay()和sig()。
POE::Kernel还能够跟踪事件发生源和与之相关的任务之间的关系。
之所以能够这么做,是因为当事件发生时,它将跟踪哪个任务被激活了。
于是它便知道了哪个任务调用方法来使用了这些资源,而这些都是自动完成的。
POE::Kernel也知道何事需将任务销毁。它检测任务以确定是否还有事件需要处理,或者是哪个事需要释放占用的资源。当任务没有事件可以触发的时候,POE::Kernel就自动销毁该资源。
POE::Kernel会在最后一个session停止以后终止运行。
b. POE::Session:
POE::Session实例就是上面所讲的由POE::Kernel管理的“任务”。(以下的章节中为了便于识别将使用“session”)
每一个session都有一个自己私有的存储空间,叫“heap”。存储在当前session的heap中的数据很难被一个外部session得到。
每一个session还拥有自己的资源和事件句柄。这些资源为拥有它们的session生成事件,而事件只被指派到其所处的session中。
举例说明,有多个session都可以设置相同的警报,并且任何一个都能接受其所请求的计时事件。
但所有其他session不会在意发生在它们之外的事情。
c. 事件句柄:
事件句柄就是Perl程序。
它们因为使用了POE::Kernel传递的参数而不同于一般的perl程序。
POE::Kernel是通过@_来传递参数。该数组的前七个成员定义了发生该事件的session的上下文。
它包括了一个指向POE::Kernel运行实例的引用、事件自身的名字、指向私有heap的引用以及指向发出事件的session的引用。
@_中剩下的成员属于事件自身,其中的具体内容依照被指派的事件类型而定。
举例说明:对于I/O事件,包括两个参数:一个是缓冲文件句柄,另一个是用来说明采取何种行为(input、output或者异常)的标记。
POE不强求程序员为每一个事件句柄分配所有的参数,要不然这将变成一件非常烦人的工作,因为它们中的一些参数是不常被用到的。
而POE::Session会自动为@_输出剩余的常量,这样就能使我们相对比较轻松地将注意力放在重要的参数上,而让POE来处理不必需的参数。
POE还允许改变参数的顺序和数量,而不会对程序造成影响。
比如说,KERNEL,HEAP和ARG0分别是 POE::Kernel实例、当前session的堆栈和事件的第一个用户参数。
它们可以一个个直接从@_被导出。
my $kernel = $_[KERNEL];
my $heap = $_[HEAP];
my $thingy = $_[ARG0];
或者一次性以队列片段的形式赋值给程序参数。
my ( $kernel, $heap, $thingy ) = @_[KERNEL, HEAP, ARG0];
当然在事件句柄中我们也可以直接使用$_[KERNEL],$_[HEAP]和$_[AG0]。
但是因为诸如ARG0的参数很难从字面上知道它在事件中代表的真实意义, 所以我们不提倡直接使用这种做法。
三.简单的POE例子
现在大致知道了POE编程的概念,我们将举若干例子来了解它到底是怎么运行的。
1. 一个单session的例子
简单的POE程序包括三个部分:
一个用来加载模块和配置条件的前端,
初始化并且运行一个或者多个session的主体
和用来描述事件句柄的具体程序。
a. 前端:
#!/usr/bin/perl
use warnings;
use strict;
use POE;
引入POE模块的过程的背后隐藏了一些细节,
事实上这么做还加载了一些诸如POE::Kernel和POE:Sesson的模块并相应地做了一些初始化,而通常在每个POE程序中我们都会用到这些隐藏模块。
在POE::Kernel第一次被引入时,它生成了一个将贯穿整个程序POE::Kernel实例。
POE::Session会根据不同的事件输出默认常量给事件句柄的某些参数,如:KERNEL,HEAP,ARG0等等。
所以一个简单的use POE为程序做了大量的初始化工作。
b. 主体session:
当所有的条件都准备好之后,为了保证POE::Kernel的有效运行,我们必须建立至少一个session。不然的话,运行程序意味着无事可做。
在这个例子里,我们将建立一个包含_start,_stop和count这三个事件的任务。
POE::Session将每个事件与一个句柄联系在一起。
POE::Session->create{
Inline_states => {
_start => \&session_start,
_stop => \&session_stop,
count => \&session_count,
}
};
前两个事件是由POE::Kernel自身所提供的。它们分别表示该session的启动和销毁。
最后一个事件是用户自定义事件,它被用于程序的逻辑之中。
我们之所以没有为session保留一个引用的原因是因为该session会自动被注册到POE::Kernel中,并接收它的管理,
而我们在程序是很少直接使用该session的。
事实上,保存一个session的应用是存在危险的。
因为如果存在显式的引用,Perl将不会自动销毁session对象或者重新为其分配内存。
接着我们启动POE::Kernel,由此便建立了一个用来探测并分派事件的主循环。
在此示例程序中,为了使运行结果更加明确,我们将注明POE::Kernel运行的开始处和结束点。
print “Starting POE::Kernel.\n”;
POE::Kernel->run();
print “POE::Kernle’s run method returned.\n”;
exit;
Kernel的run方法只有在所有session返回之后才会停止循环。
之后,我们调用一个表示程序结束的提示符的exit系统方法来表示程序被终止,而在实际的应用中这么做是不必要的。
c. 事件句柄:
下面我们来了解一下事件句柄的应用,首先从_start开始。
_start的句柄将在sesson初始化完成之后开始运行,session在其自身的上下文中使用它来实现输入引导。
比如初始化heap中的值,或者分配一些必要的资源等等。
在该句柄中我们建立了一个累加器,并且发出了“count”事件以触发相应的事件句柄。
sub session_start {
print "Session ", $_[SESSION]->ID, " has started.\n";
$_[HEAP]->{count} = 0;
$_[KERNEL]->yield("count");
}
一些熟悉线程编程的人可能会对yield方法在这里的使用产生困惑。
事实上,它并不是用来中止session运行的,而是将一个事件放入fifo分派队列的末尾处,
当队列中在其之前的事件被处理完毕之后,该事件将被触发以运行相应的事件句柄。
这个概念在多任务环境下更容易被理解。我们可以通过在调用yield方法之后立即返回的办法,来清晰地体现yield方法的行为。
接下来的是_stop句柄。POE::Kernel将在所有session再无事件可触发之后,并且是在自身被销毁之前调用它。
sub session_stop {
print "Session ", $_[SESSION]->ID, " has stopped.\n";
}
在_stop中设置一个事件是无用的。
销毁session的过程本身包括清理与之相关的资源,而事件就是资源的组成部分。
所以对于所有在_stop中的事件,在其能够被分派之前都是将被清理的。
最后讲一下count事件句柄。
该函数用来增加heap中的累加器计数,并打印累加结果。我们可以使用一个while来完成这件工作,
但是用yield方法一来可以使得程序更短小精悍,二来还能够加深对POE事件处理原理的理解。
sub session_count {
my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
my $session_id = $_[SESSION]->ID;
my $count = ++$heap->{count};
print "Session $session_id has counted to $count.\n";
$kernel->yield("count") if $count < 10;
}
该函数的最后一句表示:
只要累加器计数未超过10,session将再yield一个count事件。
因为不断地触发了session_count句柄,使得当前session可以继续得以生存而不会被POE::Kernel清理。
当计数器到10时,便不再调用yield命令,session也将停止。
一旦POE::Kernel检测到该session再没有事件句柄可被激发,便在调用_stop事件句柄之后将其清理销毁。
以下是运行的结果:
Session 2 has started.
Starting POE::Kernel.
Session 2 has counted to 1.
Session 2 has counted to 2.
Session 2 has counted to 3.
Session 2 has counted to 4.
Session 2 has counted to 5.
Session 2 has counted to 6.
Session 2 has counted to 7.
Session 2 has counted to 8.
Session 2 has counted to 9.
Session 2 has counted to 10.
Session 2 has stopped.
POE::Kernel's run() method returned.
对于该结果有几点需要解释一下。
A. 为什么在运行结果中session的id是2?
因为通常情况下,POE::Kernel是最先被创建的,它的id号会是1。接下来创建session的id号依次被累加。
B. 为什么_start事件在kernel运行之前?
因为当运行POE::Session->create时就会分派_start事件,所以_start事件句柄的激发是在POE::Kernel运行之前的。
C. 为什么_strart事件之后的count事件没有立即处理?
第一个count事件句柄并没有被立即处理。这是因为该事件被Kernel放入了分派队列之中。
D. Session停止的原因有哪些?
导致session停止的原因除了再没有事件可触发而之外,外部的终止信号也可以用来停止session。
2.多任务的POE例子
可以将以上的这个计数程序做成多任务的形式,使每一个session将在其自身的heap中保存累加器。
各个session的事件被依次传送到POE::Kernel的事件队列中,并以先进先出的形式进行处理,以保证这些事件将轮流被执行。
为了演示这个结果,我们将复制以上程序中的session部分,其它部分保持原样不变。
for ( 1 .. 2 ) {
POE::Session->create(
inline_states => {
_start => \&session_start,
_stop => \&session_stop,
count => \&session_count,
}
);
}
以下便是修改后的程序的运行结果:
Session 2 has started.
Session 3 has started.
Starting POE::Kernel.
Session 2 has counted to 1.
Session 3 has counted to 1.
Session 2 has counted to 2.
Session 3 has counted to 2.
Session 2 has counted to 3.
Session 3 has counted to 3.
Session 2 has counted to 4.
Session 3 has counted to 4.
Session 2 has counted to 5.
Session 3 has counted to 5.
Session 2 has counted to 6.
Session 3 has counted to 6.
Session 2 has counted to 7.
Session 3 has counted to 7.
Session 2 has counted to 8.
Session 3 has counted to 8.
Session 2 has counted to 9.
Session 3 has counted to 9.
Session 2 has counted to 10.
Session 2 has stopped.
Session 3 has counted to 10.
Session 3 has stopped.
POE::Kernel's run() method returned.
每一个session是在自身heap中保存计数数据的,这与我们建立的session实例数量无关。
POE轮次处理每一个事件,每次只有一个事件句柄被运行。
当事件句柄运行的时候,POE::Kernel自身也将被中断,在事件句柄返回之前,没有事件被分派。
当各个session的事件被传送到主程序事件队列后,位于队列头部的事件被首先处理,新来的事件将被放置在队列的尾部。
以此保证队列的轮次处理。
POE::Kernek的run方法在最后一个session停止之后返回。
四.回声服务器
最后我们将用IO::Select建立一个非派生的回声服务器,然后再利用多个抽象层的概念将它移植到POE上。
1. 一个简单的select()服务器
这个非派生的服务器的原型来自于《Perl Cookbook》中的17.13章节。
为了保持简洁并且也是为了更方便于移植到POE上,对其做了一些修改。
同时为了增加可读性,还给该服务器设定一些小的目的和功能。
首先,需要引入所需的模块并初始化一些数据结构。
#!/usr/bin/perl
use warnings;
use strict;
use IO::Socket;
use IO::Select;
use Tie::RefHash;
my %inbuffer = ();
my %outbuffer = ();
my %ready = ();
tie %ready, "Tie::RefHash";
接下来,我们要建立一个服务器socket。为了不阻塞单进程的服务器,这个socket被设置为非阻塞状态。
my $server = IO::Socket::INET->new
( LocalPort => 12345,
Listen => 10,
) or die "can't make server socket: $@\n";
$server->blocking(0);
然后建立主循环。
我们制造一个IO::Socket对象用以监视socket上的活动。
无论何时,当有一个事件发生在socket上,都会有相应的程序来处理它。
my $select = IO::Select->new($server);
while (1) {
foreach my $client ( $select->can_read(1) ) {
handle_read($client);
}
foreach my $client ( keys %ready ) {
foreach my $request ( @{ $ready{$client} } ) {
print "Got request: $request";
$outbuffer{$client} .= $request;
}
delete $ready{$client};
}
foreach my $client ( $select->can_write(1) ) {
handle_write($client);
}
}
exit;
以上的主循环对整个程序做了一个大致的总结。
下面是用于处理socket不同行为的几个函数:
第一个函数用来处理可读状态的socket。
如果这个准备就绪的socket是服务器的socket,我们再接收一个新的连接,并将它注册到IO::Socket对象中。
如果这是一个存在输入数据的客户端socket,我们读取数据并对其进行处理,并将处理的结果添加到%ready数据结构中。
主循环会捕获在%ready中的数据,并将它们回传给客户端。
sub handle_read {
my $client = shift;
if ( $client == $server ) {
my $new_client = $server->accept();
$new_client->blocking(0);
$select->add($new_client);
return;
}
my $data = "";
my $rv = $client->recv( $data, POSIX::BUFSIZ, 0 );
unless ( defined($rv) and length($data) ) {
handle_error($client);
return;
}
$inbuffer{$client} .= $data;
while ( $inbuffer{$client} =~ s/(.*\n)// ) {
push @{ $ready{$client} }, $1;
}
}
接下来是一个处理可写状态的socket的函数。
等待被发送到客户端的数据将被写到这个socket中,之后被从输出缓冲中删除。
sub handle_write {
my $client = shift;
return unless exists $outbuffer{$client};
my $rv = $client->send( $outbuffer{$client}, 0 );
unless ( defined $rv ) {
warn "I was told I could write, but I can't.\n";
return;
}
if ( $rv == length( $outbuffer{$client} ) or
$! == POSIX::EWOULDBLOCK
) {
substr( $outbuffer{$client}, 0, $rv ) = "";
delete $outbuffer{$client} unless length $outbuffer{$client};
return;
}
handle_error($client);
}
最后我们需要一个程序来处理客户socket在读取和发送数据时产生的错误。
它会为发生错误的socket做一些清理工作,并保证它们被正确关闭。
sub handle_error {
my $client = shift;
delete $inbuffer{$client};
delete $outbuffer{$client};
delete $ready{$client};
$select->remove($client);
close $client;
}
短短130行代码,我们就有了一个简单的回声服务器。不算太坏,但是我们可以做得更好。
2. 将服务器移植到POE上
为了把IO::Socket服务器移植到POE上,需要使用到某些POE的底层特征。
为了详细说明的需要,我们竟可能地不省略细节,而最终的程序也将保留其中的大部分代码。
事实上,以上的IO::Socket服务器本身就是由事件驱动的,在其中包含了一个用于检测并分派之间的主循环,配以处理这些事件的相应事件句柄。
从这一点上来说,与POE的原理和架构有异曲同工的意思。
新的服务器程序需要一个如下所示的POE空框架。用于具体功能实现的代码将被添加到这个框架之中。
#!/usr/bin/perl
use warnings;
use strict;
use POSIX;
use IO::Socket;
use POE;
POE::Session->create
( inline_states =>
{
}
);
POE::Kernel->run();
exit;
在继续完成接下来的程序之前,为了勾勒出程序的大致框架结构,必须明确哪些事件的出现是必要的。
1) 服务器启动,完成初始化。
2) 服务器socket准备就绪,可以接收连接。
3) 客户socket处于可读取状态,服务器读取数据并对其进行处理。
4) 客户socket处于可写状态,服务器对其写入一些数据。
5) 客户socket发生错误,需要将其关闭。
一旦知道了需要做些什么,就可以建立这些事件的名称,并为这些事件编写相应的事件处理句柄,从而快速地完成POE::Session的构造。
POE::Session->create
( inline_states =>
{ _start => \&server_start,
event_accept => \&server_accept,
event_read => \&client_read,
event_write => \&client_write,
event_error => \&client_error,
}
);
现在是真正将IO::Select代码移植过来的时候了!
和IO::Select服务器相同,需要为客户socket提供输入和输出缓冲,
而且由于这两个缓冲对socket句柄的重要性并且不存在冲突,它们将被保持为程序的全局变量。
另外,在这里将不再使用%ready哈希表。
my %inbuffer = ();
my %outbuffer = ();
紧接着是引入IO::Select的程序片段,因为每一段都是上面指定的事件所触发的,因此这些片段将被移植入相应事件的处理句柄中。
在_start事件的处理句柄中,需要建立一个服务器的监听socket,并用select_read为其分配一个事件发生器。
句柄中用到的POE::Kernel模块中的select_read方法接收两个参数:
第一个是需要监视的socket,
第二个是当该socket处于可读状态时所触发的处理句柄。
sub server_start {
my $server = IO::Socket::INET->new
( LocalPort => 12345,
Listen => 10,
Reuse => "yes",
) or die "can't make server socket: $@\n";
$_[KERNEL]->select_read( $server, "event_accept" );
}
注意一点,我们并没有保存服务器socket。
因为POE::Kernel会对其进行跟踪,并将其作为一个参数传递给event_accept事件句柄。
只有在需要特殊用途的情况下,我们才会保存一个该socket的拷贝。
再回顾POE::Session构造器,事件event_accept会激发server_accept事件句柄。
该句柄接收一个新的客户socket,并对其分配一个监视器。
sub server_accept {
my ( $kernel, $server ) = @_[ KERNEL, ARG0 ];
my $new_client = $server->accept();
$kernel->select_read( $new_client, "event_read" );
}
之后我们在client_read句柄中处理处理来自客户的数据。
当新连接的客户socket处于可读状态时,句柄被触发。
该句柄中的第一个客户参数即为所连接的客户socket,因此我们无需再为其保留一个拷贝。
在内容上,句柄client_read与IO::Select服务器中的handle_read几乎一样。
而由于handle_read的accept部分的内容被移植到了server_accept句柄中,相应地就不再需要%ready哈希表了。
如果接收过程发生错误,客户socket通过POE::Kernel的yield方法将被传送到event_error句柄中。
因为该yield方法是根据程序员的具体要求发送事件的,所以需要在yield中将发生错误的客户socket作为某个参数,
而此socket在处理该event_error的事件句柄client_error中被赋值给$_[ARG0]。
接着,如果在client_read中发现输出缓存存在数据,
我们将检测以保证当该客户socket处于可写状态时,及时触发事件处理句柄,将数据发送出去。
sub client_read {
my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];
my $data = "";
my $rv = $client->recv( $data, POSIX::BUFSIZ, 0 );
unless ( defined($rv) and length($data) ) {
$kernel->yield( event_error => $client );
return;
}
$inbuffer{$client} .= $data;
while ( $inbuffer{$client} =~ s/(.*\n)// ) {
$outbuffer{$client} .= $1;
}
if ( exists $outbuffer{$client} ) {
$kernel->select_write( $client, "event_write" );
}
}
在用于发送数据的事件句柄中,第一个客户参数依然是一个可用的socket。
在该句柄中,如果输出缓存为空,则停止检测并迅速返回。
否则,我们将试图将缓冲内的数据全部发出。如果所有数据均发送成功,该缓冲将被销毁。
与client_read类似,client_write中也有相应的错误处理句柄。
sub client_write {
my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];
unless ( exists $outbuffer{$client} ) {
$kernel->select_write($client);
return;
}
my $rv = $client->send( $outbuffer{$client}, 0 );
unless ( defined $rv ) {
warn "I was told I could write, but I can't.\n";
return;
}
if ( $rv == length( $outbuffer{$client} ) or
$! == POSIX::EWOULDBLOCK
) {
substr( $outbuffer{$client}, 0, $rv ) = "";
delete $outbuffer{$client} unless length $outbuffer{$client};
return;
}
$kernel->yield( event_error => $client );
}
最后说明一下在以上两个句柄中被用到的错误处理句柄。
我们首先删除了客户socket的输入输出缓存,再关闭建立在该socket上的所有监视,最后保证该socket被成功关闭。
如此这般便有效地关闭了来自于客户的连接。
sub client_error {
my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];
delete $inbuffer{$client};
delete $outbuffer{$client};
$kernel->select($client);
close $client;
}
移植成功!
阅读(2150) | 评论(0) | 转发(0) |