Chinaunix首页 | 论坛 | 博客
  • 博客访问: 80450
  • 博文数量: 31
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 340
  • 用 户 组: 普通用户
  • 注册时间: 2013-04-02 20:25
文章分类

全部博文(31)

文章存档

2015年(2)

2014年(29)

我的朋友

分类: Java

2014-09-13 20:50:15

  在NIO中,通过serversocket的bind函数将channel与socket地址绑定,最后将该channel的op_accept注册到selector中,算是可以接受客户端的连接了。在MINA中,对op_accept事件使用单独一个selector,并可以监听多个channle的op_accept事件。对已经绑定过的SocketAddress,MINA并不检查,若已经绑定过,则会抛出已绑定的异常,但并不影响其他的SocketAddress。
 MINA中开启监听分为两步,分别在不同的线程完成。
1. bind()
函数所在线程只是简单的将SocketAddress地址列表扔进AcceptorOperationFuture(表示一个开启监听的请求)队列,然后启动Acceptor线程。
2. 
Acceptor线程会不断的从AcceptorOperationFuture队列中poll一个AcceptorOperationFuture,创建实际的channel,并注册op_accept事件。

一. bind

   bind函数首先会判断一下handle是否为空,如果为空则抛出异常。接着通用bindInternal将请求打包成AcceptorOperationFuture,然后将监听成功的SocketAddress插入boundAddresses集合里面,最后如果是第一次启动监听则触发ServiceActivated事件。

点击(此处)折叠或打开

  1. protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
  2.         //1.打包成开启监听请求
  3.         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
  4.         //2.插入注册监听队列
  5.         registerQueue.add(request);
  6.         //3.启动Acceptor线程,实际启动监听的线程
  7.         startupAcceptor();
  8.         try {
  9.             //lock为值为1的信号量,锁住先等本次绑定请求处理完
  10.             lock.acquire();
  11.             //Acceptor线程可能初次启动,等10ms待启动完成
  12.             Thread.sleep(10);
  13.             //Acceptor线程一启动就阻塞于select函数(op_accept),需要客户端连接或
  14.             //手动唤醒。绑定监听需要手动唤醒让其处理绑定请求
  15.             wakeup();
  16.         } finally {
  17.             lock.release();
  18.         }

  19.         //future模式,等待accpetor线程对当前AcceptorOperationFuture请求处理完成
  20.         request.awaitUninterruptibly();

  21.         if (request.getException() != null) {
  22.             throw request.getException();
  23.         }

  24.        
  25.         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
  26.         //acceptor将成功的结果写到boundHandles里
  27.         for (H handle : boundHandles.values()) {
  28.             newLocalAddresses.add(localAddress(handle));
  29.         }

  30.         return newLocalAddresses;
  31.     }
  32.     
  33.     private void startupAcceptor() throws InterruptedException {
  34.         //selectable在构造函数中被设为true
  35.         if (!selectable) {
  36.             registerQueue.clear();
  37.             cancelQueue.clear();
  38.         }

  39.         //取得Acceptor线程引用,acceptorRef为原子引用,利用CAS保证同步
  40.         Acceptor acceptor = acceptorRef.get();

  41.         if (acceptor == null) {
  42.             //锁住,保证只有一个Acceptor线程
  43.             lock.acquire();
  44.             acceptor = new Acceptor();

  45.             if (acceptorRef.compareAndSet(null, acceptor)) {
  46.                 //这里表示Acceptor线程第一次启动,将lock的释放放入Acceptor线程里面
  47.                 executeWorker(acceptor);
  48.             } else {
  49.                 //在试图启动过程中,另外一个线程已经启动Acceptor。则Acceptor线程将不会执行release操作
  50.                 //所以在这里释放锁
  51.                 lock.release();
  52.             }
  53.         }
  54.     }

二. Acceptor线程

  该线程的功能是绑定监听,处理op_accept请求,解除绑定。
整个流程:

点击(此处)折叠或打开

  1. public void run() {
  2.          assert (acceptorRef.get() == this);

  3.          int nHandles = 0;

  4.          //第一次启动,因为startupAcceptor函数的原因在这里释放锁
  5.          lock.release();
  6.          //selectable见上
  7.          while (selectable) {
  8.              try {
  9.                  //阻塞于OP_ACCEPT,该事件在下面的registerHandles的注册
  10.                  //这个函数会阻塞,所以绑定监听需要手动wakeup
  11.                  int selected = select();
  12.                  //处理监听请求,注册OP_ACCEPT,注意这里是+=表示累积的监听数
  13.                  nHandles += registerHandles();
  14.                  //没有任何socket在监听
  15.                  if (nHandles == 0) {
  16.                      //企图终止线程,将acceptorRef设为null
  17.                      acceptorRef.set(null);
  18.                      //若bind请求跟unbind请求为空,终止线程
  19.                      if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
  20.                          assert (acceptorRef.get() != this);
  21.                          break;
  22.                      }
  23.                      //否则恢复线程,恢复过程中可能重新启动了一个Acceptor线程
  24.                      if (!acceptorRef.compareAndSet(null, this)) {
  25.                          assert (acceptorRef.get() != this);
  26.                          break;
  27.                      }
  28.                      assert (acceptorRef.get() == this);
  29.                  }

  30.                  if (selected > 0) {
  31.                      //处理OP_ACCEPT
  32.                      processHandles(selectedHandles());
  33.                  }
  34.                  //处理unbind
  35.                  nHandles -= unregisterHandles();
  36.              } catch (ClosedSelectorException cse) {
  37.                  // If the selector has been closed, we can exit the loop
  38.                  break;
  39.              } catch (Throwable e) {
  40.                  ExceptionMonitor.getInstance().exceptionCaught(e);

  41.                  try {
  42.                      //为什么?
  43.                      Thread.sleep(1000);
  44.                  } catch (InterruptedException e1) {
  45.                      ExceptionMonitor.getInstance().exceptionCaught(e1);
  46.                  }
  47.              }
  48.          }

  49.          //处理应用关闭事件
  50.          if (selectable && isDisposing()) {
  51.              selectable = false;
  52.              try {
  53.                  if (createdProcessor) {
  54.                      processor.dispose();
  55.                  }
  56.              } finally {
  57.                  try {
  58.                      synchronized (disposalLock) {
  59.                          if (isDisposing()) {
  60.                              destroy();
  61.                          }
  62.                      }
  63.                  } catch (Exception e) {
  64.                      ExceptionMonitor.getInstance().exceptionCaught(e);
  65.                  } finally {
  66.                      disposalFuture.setDone();
  67.                  }
  68.              }
  69.          }
  70.      }
bind事件:

点击(此处)折叠或打开

  1. private int registerHandles() {
  2.         //无限循环,有两个退出口
  3.         for (;;) {
  4.             //非阻塞取得bind请求
  5.             AcceptorOperationFuture future = registerQueue.poll();
  6.             //没请求,返回0退出
  7.             if (future == null) {
  8.                 return 0;
  9.             }

  10.             Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
  11.             List<SocketAddress> localAddresses = future.getLocalAddresses();

  12.             try {
  13.                 for (SocketAddress a : localAddresses) {
  14.                     //创建channel,绑定op_accept(open函数)
  15.                     H handle = open(a);
  16.                     newHandles.put(localAddress(handle), handle);
  17.                 }
  18.                 //boundHandles表示全局的已bind的socket地址
  19.                 boundHandles.putAll(newHandles);
  20.                 //future模式,设置请求处理完成
  21.                 future.setDone();
  22.                 //返回结果
  23.                 return newHandles.size();
  24.             } catch (Exception e) {
  25.                 //异常有open触发,保存下来
  26.                 future.setException(e);
  27.             } finally {
  28.                 //原子操作,调用的是channel的close.就算调用socket.close也不会直接关闭
  29.                 if (future.getException() != null) {
  30.                     for (H handle : newHandles.values()) {
  31.                         try {
  32.                             close(handle);
  33.                         } catch (Exception e) {
  34.                             ExceptionMonitor.getInstance().exceptionCaught(e);
  35.                         }
  36.                     }

  37.                     // 唤醒什么??
  38.                     wakeup();
  39.                 }
  40.             }
  41.         }
  42.     }
unbind:

点击(此处)折叠或打开

  1. private int unregisterHandles() {
  2.         int cancelledHandles = 0;
  3.         for (;;) {
  4.             AcceptorOperationFuture future = cancelQueue.poll();
  5.             if (future == null) {
  6.                 break;
  7.             }

  8.             // close the channels
  9.             for (SocketAddress a : future.getLocalAddresses()) {
  10.                 H handle = boundHandles.remove(a);

  11.                 if (handle == null) {
  12.                     continue;
  13.                 }

  14.                 try {
  15.                     close(handle);
  16.                     wakeup(); // wake up again to trigger thread death
  17.                 } catch (Throwable e) {
  18.                     ExceptionMonitor.getInstance().exceptionCaught(e);
  19.                 } finally {
  20.                     cancelledHandles++;
  21.                 }
  22.             }

  23.             future.setDone();
  24.         }

  25.         return cancelledHandles;
  26.     }



阅读(1232) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~