Chinaunix首页 | 论坛 | 博客
  • 博客访问: 542816
  • 博文数量: 260
  • 博客积分: 10435
  • 博客等级: 上将
  • 技术积分: 1939
  • 用 户 组: 普通用户
  • 注册时间: 2009-11-24 14:50
文章分类

全部博文(260)

文章存档

2011年(22)

2010年(209)

2009年(29)

我的朋友

分类: Java

2010-12-21 23:37:37

Apache Mina是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Javanio技术基于TCP/IPUDP/IP协议提供了抽象的、事件驱动的、异步的API

如下的特性:

1  基于Java nioTCP/IPUDP/IP实现

基于RXTX的串口通信(RS232

VM 通道通信

2、通过filter接口实现扩展,类似于Servlet filters

3low-level(底层)和high-level(高级封装)的api

       low-level:使用ByteBuffers

       High-level:使用自定义的消息对象和解码器

4Highly customizable(易用的)线程模式(MINA2.0 已经禁用线程模型了):

       单线程

       线程池

       多个线程池

5、基于java5 SSLEngineSSLTLSStartTLS支持

6、负载平衡

7、使用mock进行单元测试

8jmx整合

9、基于StreamIoHandler的流式I/O支持

10IOC容器的整合:SpringPicoContainer

11、平滑迁移到Netty平台

 

 2.0.x 中对 Spring  IoC 的集成进行了简化,添加了基于   JMX 远程管理支持,使用基于 Java Annotation 的全新 API 大大简化了状态机编程,新的基于 Apache APR 的基础 I/O 组件促进了进一步的效率提升(据官方评测, APR 的效率较之 Sun NIO 要高出约 10%)。

 

 

异步 I/O 模型大体上可以分为两种,反应式( Reactive )模型和前摄式( Proactive )模型:

传统的 select / epoll / kqueue 模型,以及 Java NIO 模型,都是典型的反应式模型,即应用代码对 I/O 描述符进行注册,然后等待 I/O 事件。当某个或某些 I/O 描述符所对应的 I/O 设备上产生 I/O 事件(可读、可写、异常等)时,系统将发出通知,于是应用便有机会进行 I/O 操作并避免阻塞。由于在反应式模型中应用代码需要根据相应的事件类型采取不同的动作,最常见的结构便是嵌套的 if {...} else {...}   switch ,并常常需要结合状态机来完成复杂的逻辑。

前摄式模型则恰恰相反。在前摄式模型中,应用代码主动地投递异步操作而不管 I/O 设备当前是否可读或可写。投递的异步 I/O 操作被系统接管,应用代码也并不阻塞在该操作上,而是指定一个回调函数并继续自己的应用逻辑。当该异步操作完成时,系统将发起通知并调用应用代码指定的回调函数。在前摄式模型中,程序逻辑由各个回调函数串联起来:异步操作 A 的回调发起异步操作 B 的回调再发起异步操作 C ,以此往复。 Mina 便是一个前摄式的异步 I/O 框架。

前摄式模型相较于反射式模型往往更加难以编程。然而在具有原生异步 I/O 支持的操作系统中(例如支持 IO Completion Port  Win32 系统),采用前摄式模型往往可以取得比反应式模型更佳的效率。在没有原生异步 I/O 支持的系统中,也可以使用传统的反应式 API 对前摄式模型予以模拟。在现代的软硬件系统中,使用 epoll  kqueue 的前摄式模型实现同样可以轻松解决  问题。前摄式模型的一个显著优势是在实现复杂逻辑的时候不需要借助于状态机。因为状态机已经隐含在由回调串联起来的异步操作链当中了。可以参考  ,这是一个相当优秀的跨平台 C++ 前摄式 I/O 模型实现。


下图是 MINA 的架构图,



 

在图中的模块链中,IoService 便是应用程序的入口,相当于我们前面代码中的 IoAccepter,IoAccepter 便是 IoService 的一个扩展接口。IoService 接口可以用来添加多个 IoFilter,这些 IoFilter 符合责任链模式并由 IoProcessor 线程负责调用。而 IoAccepter 在 ioService 接口的基础上还提供绑定某个通讯端口以及取消绑定的接口。在上面的例子中,我们是这样使用 IoAccepter 的:

IoAcceptor acceptor = new SocketAcceptor();

相当于我们使用了 Socket 通讯方式作为服务的接入,当前版本的 MINA 还提供了除 SocketAccepter 外的基于数据报文通讯的 DatagramAccepter 以及基于管道通讯的 VmPipeAccepter。另外还包括串口通讯接入方式,目前基于串口通讯的接入方式已经在最新测试版的 MINA 中提供。你也可以自行实现 IoService 接口来使用自己的通讯方式。

而在上图中最右端也就是 IoHandler,这便是业务处理模块。相当于前面例子中的 HelloHandler 类。在业务处理类中不需要去关心实际的通讯细节,只管处理客户端传输过来的信息即可。编写 Handler 类就是使用 MINA 开发网络应用程序的重心所在,相当于 MINA 已经帮你处理了所有的通讯方面的细节问题。为了简化 Handler 类,MINA 提供了 IoHandlerAdapter 类,此类仅仅是实现了 IoHandler 接口,但并不做任何处理。

一个 IoHandler 接口中具有如下一些方法(摘自 MINA 的 API 文档):

void exceptionCaught(IoSession session, Throwable cause) 
当接口中其他方法抛出异常未被捕获时触发此方法
void messageReceived(IoSession session, Object message) 
当接收到客户端的请求信息后触发此方法.
void messageSent(IoSession session, Object message) 
当信息已经传送给客户端后触发此方法.
void sessionClosed(IoSession session) 
当连接被关闭时触发,例如客户端程序意外退出等等.
void sessionCreated(IoSession session) 
当一个新客户端连接后触发此方法.
void sessionIdle(IoSession session, IdleStatus status) 
当连接空闲时触发此方法.
void sessionOpened(IoSession session) 
当连接后打开时触发此方法,一般此方法与 sessionCreated 会被同时触发

前面我们提到 IoService 是负责底层通讯接入,而 IoHandler 是负责业务处理的。那么 MINA 架构图中的 IoFilter 作何用途呢?答案是你想作何用途都可以。但是有一个用途却是必须的,那就是作为 IoService 和 IoHandler 之间的桥梁。IoHandler 接口中最重要的一个方法是 messageReceived,这个方法的第二个参数是一个 Object 型的消息,总所周知,Object 是所有 Java 对象的基础,那到底谁来决定这个消息到底是什么类型呢?答案也就在这个 IoFilter 中。在前面使用的例子中,我们添加了一个 IoFilter 是 new ProtocolCodecFilter(new TextLineCodecFactory()),这个过滤器的作用是将来自客户端输入的信息转换成一行行的文本后传递给 IoHandler,因此我们可以在 messageReceived 中直接将 msg 对象强制转换成 String 对象。

而如果我们不提供任何过滤器的话,那么在 messageReceived 方法中的第二个参数类型就是一个 byte 的缓冲区,对应的类是 org.apache.mina.common.ByteBuffer。虽然你也可以将解析客户端信息放在 IoHandler 中来做,但这并不是推荐的做法,使原来清晰的模型又模糊起来,变得 IoHandler 不只是业务处理,还得充当协议解析的任务。

MINA自身带有一些常用的过滤器,例如LoggingFilter(日志记录)、BlackListFilter(黑名单过滤)、CompressionFilter(压缩)、SSLFilter(SSL加密)等。



客户端通信过程 
1.通过SocketConnector同服务器端建立连接 
2.链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的 
3.通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议 
4.最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程 
5.写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道 
IoFilterChain作为消息过滤链 
1.读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程 
2.写入的时候一般是从业务对象到字节byte的过程 
IoSession贯穿整个通信过程的始终 

整个过程可以用一个图来表现 
 
消息箭头都是有NioProcessor-N线程发起调用,默认情况下也在NioProcessor-N线程中执行 

类图 
 

Connector 
作为连接客户端,SocketConector用来和服务器端建立连接,连接成功,创建IoProcessor Thread(不能超过指定的processorCount),Thread由指定的线程池进行管理,IoProcessor 利用NIO框架对IO进行处理,同时创建IoSession。连接的建立是通过Nio的SocketChannel进行。 

NioSocketConnector connector = new NioSocketConnector(processorCount); 
ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一个I/O通道 

Acceptor 
作为服务器端的连接接受者,SocketAcceptor用来监听端口,同客户端建立连接,连接建立之后的I/O操作全部交给IoProcessor进行处理 
IoAcceptor acceptor = new NioSocketAcceptor(); 
acceptor.bind( new InetSocketAddress(PORT) ); 
Protocol 
利用IoFilter,对消息进行解码和编码,如以下代码通过 MyProtocolEncoder 将java对象转成byte串,通过MyProtocalDecoder 将byte串恢复成java对象 

Java代码 
  1. connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalFactory()));  
  2. ......  
  3. public class MyProtocalFactory implements ProtocolCodecFactory {  
  4.  ProtocolEncoderAdapter encoder = new MyProtocolEncoder();  
  5.  ProtocolDecoder decoder = new MyProtocalDecoder() ;  
  6.  public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
  7.   return decoder;  
  8.  }  
  9.  public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
  10.   return encoder;  
  11.  }  
  12. }  
  13. ......  
  14. public class MyProtocalDecoder extends ProtocolDecoderAdapter  {  
  15.   
  16.  public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)  
  17.    throws Exception {  
  18.     
  19.   int  id  = in.getInt();  
  20.   int  len = in.getInt();  
  21.   byte[]  dst = new byte[len];  
  22.     
  23.   in.get(dst);  
  24.     
  25.   String name = new String(dst,"GBK");  
  26.     
  27.   Item item = new Item();  
  28.   item.setId(id);  
  29.   item.setName(name);  
  30.   out.write(item);  
  31.  }  
  32. }  
  33. ......  
  34. public class MyProtocolEncoder extends ProtocolEncoderAdapter {  
  35.   
  36.  public void encode(IoSession session, Object message,  
  37.    ProtocolEncoderOutput out) throws Exception {  
  38.   Item item = (Item)message;  
  39.   int byteLen = 8 + item.getName().getBytes("GBK").length ;  
  40.   IoBuffer buf = IoBuffer.allocate(byteLen);  
  41.   buf.putInt(item.getId());  
  42.   buf.putInt(item.getName().getBytes("GBK").length);  
  43.   buf.put(item.getName().getBytes("GBK"));  
  44.   buf.flip();  
  45.   out.write(buf);  
  46.     
  47.  }  
  48. }  

handler 
具体处理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。 
connector.setHandler(new MyHandler());MyHandler继承IoHandlerAdapter类或者实现IoHandler接口.事件最终由IoProcessor线程发动调用。 
Processor 
I/O处理器、允许多线程读写,开发过程中只需要指定线程数量,Processor通过Nio框架进行I/O的续写操作,Processor包含了Nio的Selector的引用。这点也正是mina的优势,如果直接用Nio编写,则需要自己编写代码来实现类似Processor的功能。正因为I/O Processor是异步处理读写的,所以我们有时候需要识别同一个任务的消息,比如一个任务包括发送消息,接收消息,反馈消息,那么我们需要在制定消息格式的时候,消息头里能包含一个能识别是同一个任务的id。 
I/O Porcessor线程数的设置 :如果是SocketConnector,则可以在构造方法中指定,如:new SocketConnector(processorCount, Executors.newCachedThreadPool());如果是SocketAcceptor,也是一样的:SocketAcceptor acceptor = new SocketAcceptor(ProcessorCount, Executors.newCachedThreadPool()); 
processorCount为最大Porcessor线程数,这个值可以通过性能测试进行调优,默认值是cpu核数量+1(Runtime.getRuntime().availableProcessors() + 1)。 
比较奇怪的是,每个IoProcessor在创建的时候会本地自己和自己建立一个连接? 

IoSession 
IoSession是用来保持IoService的上下文,一个IoService在建立Connect之后建立一个IoSession(一个连接一个session),IoSession的生命周期从Connection建立到断开为止 
IoSession做两件事情: 
1.通过IoSession可以获取IoService的所有相关配置对象(持有对IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用) 
2.通过IoSession.write 是数据写出的入口 

关于线程 
 
ThreadModel 1.x版本的mina还有线程模式选项在2.x之后就没有了 
1.x版本指定线程模式 
SocketConnectorConfig cfg = new SocketConnectorConfig(); 
cfg.setThreadModel(ThreadModel.MANUAL); 

MINA有3种worker线程 
Acceptor、Connector、I/O processor 线程 
Acceptor Thread 
一般作为服务器端链接的接收线程,实现了接口IoService,线程的数量就是创建SocketAcceptor 的数量 
Connector Thread 
一般作为客户端的请求建立链接线程,实现了接口IoService,维持了一个和服务器端Acceptor的一个链接,线程数量就是创建SocketConnector 的数量 

Mina的SocketAcceptor和SocketConnector均是继承了BaseIoService,是对IoService的两种不同的实现 
I/O processor Thread 
作为I/O真正处理的线程,存在于服务器端和客户端,用来处理I/O的读写操作,线程的数量是可以配置的,默认最大数量是CPU个数+1 
服务器端:在创建SocketAcceptor的时候指定ProcessorCount 
SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 
客户端:在创建SocketConnector 的时候指定ProcessorCount 
SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 
I/O Processor Thread,是依附于IoService,类似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector这个线程允许CPU+1个I/O Processor Thread 
NioProcessor虽然是多线程,但是对与一个连接的时候业务处理只会使用一个线程进行处理(Processor线程对于一个客户端连接只使用一个线程NioProcessor-n)如果handler的业务比较耗时,会导致NioProcessor线程堵塞 ,在2个客户端同时连接上来的时候会创建第2个(前提是第1个NioProcessor正在忙),创建的最大数量由Acceptor构造方法的时候指定。如果:一个客户端连接同服务器端有很多通信,并且I/O的开销不大,但是Handler处理的业务时间比较长,那么需要采用独立的线程模式,在FilterChain的最后增加一个ExecutorFitler
acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 
这样可以保证processor和handler的线程是分开的,否则:客户端发送3个消息,而服务器对于每个消息要处理10s左右,那么这3个消息是被串行处理,在处理第一个消息的时候,后面的消息将被堵塞,同样反过来客户端也有同样的问题。 

客户端Porcessor堵塞测试情况: 
1.以下代码在建立连接后连续发送了5个消息(item) 
 
Java代码 
  1. ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));  
  2.                 future.awaitUninterruptibly();  
  3.                 session = future.getSession();  
  4.                 Item item = new Item();  
  5.                 item.setId(12345);  
  6.                 item.setName("hi");  
  7.                 session.write(item);  
  8.                 session.write(item);  
  9.                 session.write(item);  
  10.                 session.write(item);  
  11.                 session.write(item);  



2.在handle的messageSent方法进行了延时处理,延时3秒 

 
Java代码 
  1. public void messageSent(IoSession session, Object message) throws Exception {  
  2.       Thread.sleep(3000);  
  3.       System.out.println(message);  
  4.         
  5.   }  


3.测试结果 
5个消息是串行发送,都由同一个IoPorcessor线程处理 
             
Java代码 
  1. session.write(item);  
  2.               session.write(item);  
  3.               session.write(item);  
  4.               session.write(item);  
  5.               session.write(item);  

服务器端每隔3秒收到一个消息。因为调用是由IoProcessor触发,而一个connector只会使用一个IoProcessor线程 

4.增加ExecutorFilter,ExecutorFilter保证在处理handler的时候是独立线程 
connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 
5.测试结果 
4个session.wirte变成了并行处理,服务器端同时收到了5条消息 



最近花了点时间研究了一下nio,及其开源框架MINA,现把心得总结如下:

1:传统socket:阻塞式通信

每建立一个Socket连接时,同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。


 2.nio:非阻塞通讯模式

2.1NIO 设计背后的基石:反应器模式,用于事件多路分离和分派的体系结构模式。 
反应器模式的核心功能如下:
将事件多路分用 
将事件分派到各自相应的事件处理程序

NIO 的非阻塞 I/O 机制是围绕 选择器和 通道构建的。Channel 类表示服务器和客户机之间的一种通信机制。Selector 类是 Channel 的多路复用器。 Selector 类将传入客户机请求多路分用并将它们分派到各自的请求处理程序。
通道(Channel 类):表示服务器和客户机之间的一种通信机制。 
选择器(Selector类):是 Channel 的多路复用器。

Selector 类将传入的客户机请求多路分用并将它们分派到各自的请求处理程序。简单的来说:NIO是一个基于事件的IO架构,最基本的思想就是:有事件我通知你,你再去做你的事情.而且NIO的主线程只有一个,不像传统的模型,需要多个线程以应对客户端请求,也减轻了JVM的工作量。
当Channel注册至Selector以后,经典的调用方法如下:
       nio中取得事件通知,就是在selector的select事件中完成的。在selector事件时有一个线程向操作系统询问,selector中注册的Channel&&SelectionKey的键值对的各种事件是否有发生,如果有则添加到selector的selectedKeys属性Set中去,并返回本次有多少个感兴趣的事情发生。如果发现这个值>0,表示有事件发生,马上迭代selectedKeys中的SelectionKey,
根据Key中的表示的事件,来做相应的处理。实际上,这段说明表明了异步socket的核心,即异步socket不过是将多个socket的调度(或者还有他们的线程调度)全部交给操作系统自己去完成,异步的核心Selector,不过是将这些调度收集、分发而已。

j

阅读(2745) | 评论(0) | 转发(0) |
0

上一篇:java osgi

下一篇:apache mina 2

给主人留下些什么吧!~~