Chinaunix首页 | 论坛 | 博客
  • 博客访问: 80413
  • 博文数量: 31
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 340
  • 用 户 组: 普通用户
  • 注册时间: 2013-04-02 20:25
文章分类

全部博文(31)

文章存档

2015年(2)

2014年(29)

我的朋友

分类: Java

2014-09-14 16:50:55

  mina为每个session映射到一个NioProcessor,NioProcessor是一个IO处理机,会开启一个线程Processor,也持有一个专门的Selector,将其拥有的Session的OP_READ注册到该Selector上。因为read返回-1表示断开连接,所以这里也需要处理断开事件。
代码跟踪如下:

点击(此处)折叠或打开

  1. private class Processor implements Runnable {
  2.      public void run() {
  3.      ...
  4.      for (;;) {
  5.           try {
  6.           ...
  7.          int selected = select(SELECT_TIMEOUT);
  8.           ...
  9.           if (selected > 0) {
  10.               //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
  11.               process();
  12.           }

  13.          ...
  14.          } catch (ClosedSelectorException cse) {    
  15.               break;
  16.          } catch (Throwable t) {
  17.               ...
  18.         }
  19.      }
  20.      ...
  21.     }
  22. }
  23. private void process() throws Exception {
  24.         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
  25.             S session = i.next();
  26.             process(session);
  27.             i.remove();
  28.         }
  29.     }

  30.  private void process(S session) {
  31.         // Process Reads
  32.         if (isReadable(session) && !session.isReadSuspended()) {
  33.             read(session);
  34.         }

  35.         // Process writes
  36.         if (isWritable(session) && !session.isWriteSuspended()) {
  37.             // add the session to the queue, if it's not already there
  38.             if (session.setScheduledForFlush(true)) {
  39.                 flushingSessions.add(session);
  40.             }
  41.         }
  42.     }
关键代码在于read(session函数中):

点击(此处)折叠或打开

  1. private void read(S session) {
  2.         IoSessionConfig config = session.getConfig();
  3.         int bufferSize = config.getReadBufferSize();
  4.         //申请一个buffer,接受到数据存储在这里
  5.         IoBuffer buf = IoBuffer.allocate(bufferSize);
  6.         //是否处理分包--默认为true
  7.         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
  8.    
  9.         try {
  10.             int readBytes = 0;
  11.             int ret;
  12.             try {
  13.                 if (hasFragmentation) {
  14.                     //如果处理分包,则读到read返回0,或者buffer被填满
  15.                     //这样read一般被读两次
  16.                     while ((ret = read(session, buf)) > 0) {
  17.                         readBytes += ret;
  18.                         if (!buf.hasRemaining()) {
  19.                             break;
  20.                         }
  21.                     }
  22.                 } else {
  23.                     //不处理分包,直接读一次就行
  24.                     ret = read(session, buf);
  25.                     if (ret > 0) {
  26.                         readBytes = ret;
  27.                     }
  28.                 }
  29.             } finally {
  30.                 //buffer写完毕,转为读模式
  31.                 buf.flip();
  32.             }
  33.             if (readBytes > 0) {
  34.                 //读到的数据需要经过一个过滤链,然后有链尾节点转到handle处理
  35.                 IoFilterChain filterChain = session.getFilterChain();
  36.                 //触发数据接受事件
  37.                 filterChain.fireMessageReceived(buf);
  38.                 buf = null;
  39.                 //调整接受缓冲区大小,该缓冲区非socket的缓冲区
  40.                 if (hasFragmentation) {
  41.                     if (readBytes << 1 < config.getReadBufferSize()) {
  42.                         session.decreaseReadBufferSize();
  43.                     } else if (readBytes == config.getReadBufferSize()) {
  44.                         session.increaseReadBufferSize();
  45.                     }
  46.                 }
  47.             }
  48.             //处理连接断开事件
  49.             if (ret < 0) {
  50.                 scheduleRemove(session);
  51.             }
  52.         } catch (Throwable e) {
  53.             if (e instanceof IOException) {
  54.                 if (!(e instanceof PortUnreachableException)
  55.                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
  56.                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
  57.                     scheduleRemove(session);
  58.                 }
  59.             }
  60.             //抛出异常,则触发抛出异常事件
  61.             IoFilterChain filterChain = session.getFilterChain();
  62.             filterChain.fireExceptionCaught(e);
  63.         }
  64.     }

二. 连接断开

点击(此处)折叠或打开

  1. private void scheduleRemove(S session) {
  2.         removingSessions.add(session);
  3. }
op_read读返回-1,只是简单的将seesion加入removingSessions队列。处理逻辑在Processor线程:

点击(此处)折叠或打开

  1. private int removeSessions() {
  2.         int removedSessions = 0;

  3.         for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
  4.             SessionState state = getState(session);
  5.             
  6.             switch (state) {
  7.             case OPENED:
  8.                 // Try to remove this session
  9.                 if (removeNow(session)) {
  10.                     removedSessions++;
  11.                 }

  12.                 break;

  13.             case CLOSING:
  14.                 // Skip if channel is already closed
  15.                 break;

  16.             case OPENING:
  17.                 // Remove session from the newSessions queue and
  18.                 // remove it
  19.                 newSessions.remove(session);

  20.                 if (removeNow(session)) {
  21.                     removedSessions++;
  22.                 }

  23.                 break;

  24.             default:
  25.                 throw new IllegalStateException(String.valueOf(state));
  26.             }
  27.         }

  28.         return removedSessions;
  29.     }

  30. private boolean removeNow(S session) {
  31.         clearWriteRequestQueue(session);

  32.         try {
  33.             destroy(session);
  34.             return true;
  35.         } catch (Exception e) {
  36.             IoFilterChain filterChain = session.getFilterChain();
  37.             filterChain.fireExceptionCaught(e);
  38.         } finally {
  39.             clearWriteRequestQueue(session);
  40.             ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
  41.         }
  42.         return false;
  43.     }

  44. protected void destroy(NioSession session) throws Exception {
  45.         ByteChannel ch = session.getChannel();
  46.         SelectionKey key = session.getSelectionKey();
  47.         if (key != null) {
  48.             key.cancel();
  49.         }
  50.         ch.close();
  51.     }



阅读(1368) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~