续(五)
第三章. 基于NIO的异步I/O
在原始数据吞吐量并不比可伸缩的、高效利用资源的方式下对数以千计的并行连接的处理能力更重要的场景里,异步I/O可能是更合适的。异步I/O可以说是更加的复杂并且当面临处理大量消息载荷时通常需要特殊的关注。
3.1. 于其它I/O框架的不同
像其它框架那样解决详细的问题,但确有某些不同的特性:
* 简化,优化数据量密集的协议,例如HTTP。
* 高效的内存管理:数据消费者只能够读取它能够处理的那么多的输入数据,而不需要分配更多的内存。
* 在可能的地方直接访问NIO隧道。
3.2. I/O反应堆
HttpCore NIO基于Doug Lea所描述的反应堆模式。I/O反应堆的目的是对I/O事件做出反馈还有分派事件通知给单个的I/O会话。I/O反应堆的主旨是要从经典阻塞I/O模型所强加的一个线程一个连接模型中脱离出来。IOReactor接口代表了一个实现了反应堆模式的虚对象。本质上,IOReactor实现了对NIO java.nio.channels.Selector功能的封装。
I/O反应堆通常雇佣少量的分派线程(往往只有一个)去分派I/O事件通知到大量的(往往多达数千)I/O会话或连接。每个CPU核心又一个分派线程是通常被推荐的。
*****************************************************************
* *
* IOReactorConfig config = IOReactorConfig.DEFAULT; *
* IOReactor ioreactor = new DefaultConnectingIOReactor(config); *
* *
*****************************************************************
3.2.1. I/O分派者
IOReactor的实现利用IOEventDispatch接口去通知客户端的事件等待一个特定的会话。IOEventDespatch的所有的方法都是在I/O反应堆的一个分派线程上被执行的。因此,当I/O反应堆无法对其它事件做出反馈时,发生在事件方法中的处理过程不要阻塞分派线程很久是很重要的。
***********************************************************
* *
* IOReactor ioreactor = new DefaultConnectingIOReactor(); *
* IOEventDispatch eventDispatch = <...> *
* ioreactor.execute(eventDispatch); *
* *
***********************************************************
通常I/O事件被定义在IOEventDispatch接口中:
* connected:当一个新的会话被创建时触发
* inputReady:当会话等待输入时触发
* outputReady:当会话等待输出时触发
* timeout:当会话超时时触发
* disconnected:当会话被终结时触发
3.2.2. 关闭I/O反应堆
I/O反应堆的关闭是一个负责的过程并且通常可能要耗费一些时间才能完成。I/O反应堆将尝试温柔地终结所有的活动I/O会话并且在接近指定的宽限期内迅速的处理线程。如果任何I/O会话未能正确关闭,I/O反应堆将会强制关闭剩余的会话。
*********************************************
* *
* IOReactor ioreactor = <...> *
* long gracePeriod = 3000L; // milliseconds *
* ioreactor.shutdown(gracePeriod); *
* *
*********************************************
IOReactor#shutdown(long)方法可以从任何线程中安全地被调用。
3.2.3. I/O会话
IOSession接口代表了两端之间一系列逻辑相关的数据交换。IOSession封装了NIO java.nio.channels.SelectionKey和java.nio.channels.SocketChannel。与IOSession有关系的隧道可以用于从会话中读取数据和写入数据到会话中。
***********************************************************************
* *
* IOSession iosession = <...> *
* ReadableByteChannel ch = (ReadableByteChannel) iosession.channel(); *
* ByteBuffer dst = ByteBuffer.allocate(2048); *
* *
***********************************************************************
3.2.4. I/O会话状态管理
I/O会话是不绑定到一个执行线程上的,因此可以使用线程上下文去存储一个会话的状态。一个指定会话的所有细节应当被会话自身所存储。
**********************************************************
* *
* IOSession iosession = <...> *
* Object someState = <...> *
* iosession.setAttribute("state", someState); *
* ... *
* IOSession iosession = <...> *
* Object currentState = iosession.getAttribute("state"); *
* *
**********************************************************
请注意,如果一些会话被共享对象所使用,那么对这些共享对象的访问就应当是线程安全的。
3.2.5. I/O会话时间遮罩
通过设置其事件遮罩,可以声明一个特别的I/O会话对一个特别的I/O事件类型感兴趣。
*************************************************************************
* *
* IOSession iosession = <...> *
* iosession.setEventMask(SelectionKey.OP_READ | SelectionKey.OP_WRITE); *
* *
*************************************************************************
也可以单独地设定OP_READ和OP_WRITE标记。
***********************************************
* *
* IOSession iosession = <...> *
* iosession.setEvent(SelectionKey.OP_READ); *
* iosession.clearEvent(SelectionKey.OP_READ); *
* *
***********************************************
如果没有设置相应的兴趣标记,那么事件通知将不会发生。
3.2.6. I/O会话缓冲区
为了在返回到消费者之前转换输入/输出数据或写入到底层通道,I/O会话经常需要维护一些内部的I/O缓冲区。HttpCore NIO中的内存管理是基于数据消费者仅能够读取它能够处理的那么多的数据而不需要分配更多的内存这样的原则的。那意味着一些输入数据可能经常会遗留在内部或外部的会话缓冲区之一里而尚未读取。I/O反应堆可以查询这些会话缓冲区的状态,还要确保消费者得到正确的通知,当更多的数据被存储到这些会话缓冲区之一里时。因此,一旦消费者能够处理剩余数据就允许它读取它。I/O会话可以使用SessionBufferStatus接口来意识到外部缓冲区的状态。
3.2.7. I/O会话关闭
允许后话以有序地的方式关闭可以通过调用IOSession#close()温柔地关闭一个I/O会话,或通过调用IOSession#shutdown()强制关闭底层通道。两种方法之间的区别对于那些涉及到一些终止会话握手顺序的类型类型是最重要的,例如,SSL/TLS连接。
3.2.8. 监听I/O反应堆
ListeningIOReactor代表了一个I/O反应堆在一个或若干个端口上有监听到埠连接的能力。
**************************************************************************
* *
* ListeningIOReactor ioreactor = <...> *
* ListenerEndpoint ep1 = ioreactor.listen(new InetSocketAddress(8081) ); *
* ListenerEndpoint ep2 = ioreactor.listen(new InetSocketAddress(8082)); *
* ListenerEndpoint ep3 = ioreactor.listen(new InetSocketAddress(8083)); *
* // Wait until all endpoints are up *
* ep1.waitFor(); *
* ep2.waitFor(); *
* ep3.waitFor(); *
* *
**************************************************************************
一旦完成初始化一个断点它就开始接收到埠连接并且传播I/O活动通知给IOEventDispatch实例。
如果愿意的话可以在运行时获取一组已注册的端点,可以在运行时查询端点的状态并且可以关闭它。
*************************************************************
* *
* ListeningIOReactor ioreactor = <...> *
* Set eps = ioreactor.getEndpoints(); *
* for (ListenerEndpoint ep: eps) { *
* // Still active? *
* System.out.println(ep.getAddress()); *
* if (ep.isClosed()) { *
* // If not, has it terminated due to an exception? *
* if (ep.getException() != null) { *
* ep.getException().printStackTrace(); *
* } *
* } else { *
* ep.close(); *
* } *
* } *
* *
*************************************************************
3.2.9. 连接I/O反应堆
ConnectionIOReactor代表一个有与远程主机建立连接能力的I/O反应堆。
********************************************************
* *
* ConnectingIOReactor ioreactor = <...> *
* SessionRequest sessionRequest = ioreactor.connect( *
* new InetSocketAddress("", 80), *
* null, null, null); *
* *
********************************************************
打开一个到远程主机的连接通常会是一个耗费时间的过程,并且可能需要一段时间来完成。依靠SessionRequest接口可以监视和控制会话初始化的过程。
********************************************************
* *
* // Make sure the request times out if connection *
* // has not been established after 1 sec *
* sessionRequest.setConnectTimeout(1000); *
* // Wait for the request to complete *
* sessionRequest.waitFor(); *
* // Has request terminated due to an exception? *
* if (sessionRequest.getException() != null) { *
* sessionRequest.getException().printStackTrace(); *
* } *
* // Get hold of the new I/O session *
* IOSession iosession = sessionRequest.getSession(); *
* *
********************************************************
SessionRequest的实现被期望是线程安全的。通过从另一个线程里调用IOSession#cancel(),会话请求可以在任意时刻被终止。
****************************************
* *
* if (!sessionRequest.isCompleted()) { *
* sessionRequest.cancel(); *
* } *
* *
****************************************
可以传递若干个可选参数给ConnectionIOReactor#connect()方法以对会话的初始化过程施加更好的控制。
一个非Null的本地套接字地址参数可以被用于绑定套接字到一个指定的本地地址上。
********************************************************
* *
* ConnectingIOReactor ioreactor = <...> *
* SessionRequest sessionRequest = ioreactor.connect( *
* new InetSocketAddress("", 80), *
* new InetSocketAddress("192.168.0.10", 1234), *
* null, null); *
* *
********************************************************
可以提供一个附加对象,将被用于在初始化时添加到新会话的上下文中。该对象能被用于传递初始化过程状态给协议处理者。
*************************************************************
* *
* SessionRequest sessionRequest = ioreactor.connect( *
* new InetSocketAddress("", 80), *
* null, new HttpHost(""), null); *
* *
* IOSession iosession = sessionRequest.getSession(); *
* HttpHost virtualHost = (HttpHost) iosession.getAttribute( *
* IOSession.ATTACHMENT_KEY); *
* *
*************************************************************
不需要等待、锁定当前执行线程的情况下能够异步地对会话请求的完成做出反馈常常是令人满意的。可以选择性地提供一个SessionRequestCallback接口的实现去获取与会话有关的事件通知,例如请求完成、取消、失败或超时。
***************************************************************************
* *
* ConnectingIOReactor ioreactor = <...> *
* *
* SessionRequest sessionRequest = ioreactor.connect( *
* new InetSocketAddress("", 80), null, null, *
* new SessionRequestCallback() { *
* *
* public void cancelled(SessionRequest request) { *
* } *
* *
* public void completed(SessionRequest request) { *
* System.out.println("new connection to " + *
* request.getRemoteAddress()); *
* } *
* *
* public void failed(SessionRequest request) { *
* if (request.getException() != null) { *
* request.getException().printStackTrace(); *
* } *
* } *
* *
* public void timeout(SessionRequest request) { *
* } *
* } *
* ); *
* *
***************************************************************************
未完待续。。。
阅读(2136) | 评论(0) | 转发(0) |