Chinaunix首页 | 论坛 | 博客
  • 博客访问: 83788
  • 博文数量: 31
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 340
  • 用 户 组: 普通用户
  • 注册时间: 2013-04-02 20:25
文章分类

全部博文(31)

文章存档

2015年(2)

2014年(29)

我的朋友

分类: Java

2014-09-14 14:18:37

  总的来说,在Acceptor线程的Selector收到OP_ACCEPT事件后,会创建一个NioSession,一个Session表示一个连接。之后它会选择一个NioProcessor处理读写事件。NioProcessor可表示一个IO处理机,用于处理read跟write事件。Mina启动时会创建一个包含ncpus+1个NioProcessor对象的对象池,每个NioProcessor会启动一个Processor线程来处理读写事件。

1. Acceptor线程中处理
OP_ACCEPT:

点击(此处)折叠或打开

  1. private void processHandles(Iterator<H> handles) throws Exception {
  2.         while (handles.hasNext()) {
  3.             H handle = handles.next();
  4.             handles.remove();
  5.             //创建一个Session, processor是一个对象池
  6.             S session = accept(processor, handle);
  7.             if (session == null) {
  8.                 continue;
  9.             }
  10.             initSession(session, null, null);
  11.             //为该session选择一个NioSession
  12.             session.getProcessor().add(session);
  13.         }
  14.     }
2. 取得对应的NioProcessor

点击(此处)折叠或打开

  1. private IoProcessor<S> getProcessor(S session) {
  2.              //取得对应的NioProcessor
  3.      IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
  4.             //还没设置
  5.      if (processor == null) {
  6.      if (disposed || disposing) {
  7.           throw new IllegalStateException("A disposed processor cannot be accessed.");
  8.      }
  9.      //取余取得对应的NioProcessor
  10.      processor = pool[Math.abs((int) session.getId()) % pool.length];

  11.      if (processor == null) {
  12.           throw new IllegalStateException("A disposed processor cannot be accessed.");
  13.      }
  14.      session.setAttributeIfAbsent(PROCESSOR, processor);
  15.      return processor;
  16. }
3. Processor线程

点击(此处)折叠或打开

  1. private class Processor implements Runnable {
  2.         public void run() {
  3.             assert (processorRef.get() == this);
  4.             int nSessions = 0;
  5.             //用于处理Idle事情
  6.             lastIdleCheckTime = System.currentTimeMillis();
  7.             for (;;) {
  8.                 try {
  9.                     //调用有超时select,用于处理Idle事情。因为idle事件select无法触发
  10.                     long t0 = System.currentTimeMillis();
  11.                     int selected = select(SELECT_TIMEOUT);
  12.                     long t1 = System.currentTimeMillis();
  13.                     long delta = (t1 - t0);
  14.                     //select奇怪的返回了(非事件触发,非其他线程wakeup,非超时)
  15.                     if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
  16.                         //客服端断开了,服务断channel未取消导致select返回
  17.                         //isBrokenConnection取消掉这些channel
  18.                         if (isBrokenConnection()) {
  19.                             LOG.warn("Broken connection");
  20.                             wakeupCalled.getAndSet(false);
  21.                             continue;
  22.                         } else {
  23.                             LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
  24.                             // Ok, we are hit by the nasty epoll
  25.                             // spinning.
  26.                             // Basically, there is a race condition
  27.                             // which causes a closing file descriptor not to be
  28.                             // considered as available as a selected channel, but
  29.                             // it stopped the select. The next time we will
  30.                             // call select(), it will exit immediately for the same
  31.                             // reason, and do so forever, consuming 100%
  32.                             // CPU.
  33.                             // We have to destroy the selector, and
  34.                             // register all the socket on a new one.
  35.                             
  36.                             //nio的一个bug.还是无故返回,重新注册一个selecotr
  37.                             registerNewSelector();
  38.                         }
  39.                         wakeupCalled.getAndSet(false);
  40.                         continue;
  41.                     }

  42.                     //处理op_accept
  43.                     nSessions += handleNewSessions();
  44.                     //拥塞控制
  45.                     updateTrafficMask();
  46.                     //处理读写事件
  47.                     if (selected > 0) {
  48.                         process();
  49.                     }

  50.                     // Write the pending requests
  51.                     long currentTime = System.currentTimeMillis();
  52.                     flush(currentTime);

  53.                     //删除断开的连接
  54.                     nSessions -= removeSessions();
  55.                     //触发IDLE事件
  56.                     notifyIdleSessions(currentTime);
  57.                     //没有任何连接,退出线程
  58.                     if (nSessions == 0) {
  59.                         processorRef.set(null);

  60.                         if (newSessions.isEmpty() && isSelectorEmpty()) {
  61.                             // newSessions.add() precedes startupProcessor
  62.                             assert (processorRef.get() != this);
  63.                             break;
  64.                         }

  65.                         assert (processorRef.get() != this);

  66.                         if (!processorRef.compareAndSet(null, this)) {
  67.                             // startupProcessor won race, so must exit processor
  68.                             assert (processorRef.get() != this);
  69.                             break;
  70.                         }

  71.                         assert (processorRef.get() == this);
  72.                     }

  73.                     // Disconnect all sessions immediately if disposal has been
  74.                     // requested so that we exit this loop eventually.
  75.                     if (isDisposing()) {
  76.                         for (Iterator<S> i = allSessions(); i.hasNext();) {
  77.                             scheduleRemove(i.next());
  78.                         }

  79.                         wakeup();
  80.                     }
  81.                 } catch (ClosedSelectorException cse) {
  82.                     // If the selector has been closed, we can exit the loop
  83.                     break;
  84.                 } catch (Throwable t) {
  85.                     ExceptionMonitor.getInstance().exceptionCaught(t);

  86.                     try {
  87.                         Thread.sleep(1000);
  88.                     } catch (InterruptedException e1) {
  89.                         ExceptionMonitor.getInstance().exceptionCaught(e1);
  90.                     }
  91.                 }
  92.             }

  93.             try {
  94.                 synchronized (disposalLock) {
  95.                     if (disposing) {
  96.                         doDispose();
  97.                     }
  98.                 }
  99.             } catch (Throwable t) {
  100.                 ExceptionMonitor.getInstance().exceptionCaught(t);
  101.             } finally {
  102.                 disposalFuture.setValue(true);
  103.             }
  104.         }
  105.     }
4. 注册OP_READ到selector

点击(此处)折叠或打开

  1. private int handleNewSessions() {
  2.         int addedSessions = 0;

  3.         for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
  4.             if (addNow(session)) {
  5.                 // A new session has been created
  6.                 addedSessions++;
  7.             }
  8.         }

  9.         return addedSessions;
  10.     }

  11.  private boolean addNow(S session) {
  12.         boolean registered = false;

  13.         try {
  14.             init(session);
  15.             registered = true;

  16.             // Build the filter chain of this session.
  17.             IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
  18.             chainBuilder.buildFilterChain(session.getFilterChain());

  19.             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
  20.             // in AbstractIoFilterChain.fireSessionOpened().
  21.             // Propagate the SESSION_CREATED event up to the chain
  22.             IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
  23.             listeners.fireSessionCreated(session);
  24.         } catch (Throwable e) {
  25.             ExceptionMonitor.getInstance().exceptionCaught(e);

  26.             try {
  27.                 destroy(session);
  28.             } catch (Exception e1) {
  29.                 ExceptionMonitor.getInstance().exceptionCaught(e1);
  30.             } finally {
  31.                 registered = false;
  32.             }
  33.         }

  34.         return registered;
  35.     }

  36.  protected void init(NioSession session) throws Exception {
  37.         SelectableChannel ch = (SelectableChannel) session.getChannel();
  38.         ch.configureBlocking(false);
  39.         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
  40.     }
Select一直返回:
1. 注册了op_write事件,而处理完后未取消,可能一直触发op_write
2. 一方断开连接后,另外一方会收到OP_READ并且read返回-1,此时需要把channel显示close掉。否则会一直返回OP_READ
3.上面提到的bug, linux下。可升级jdk
4....

阅读(1235) | 评论(0) | 转发(0) |
0

上一篇:8. MINA--开启监听

下一篇:10. 处理OP_READ

给主人留下些什么吧!~~