Chinaunix首页 | 论坛 | 博客
  • 博客访问: 80188
  • 博文数量: 31
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 340
  • 用 户 组: 普通用户
  • 注册时间: 2013-04-02 20:25
文章分类

全部博文(31)

文章存档

2015年(2)

2014年(29)

我的朋友

分类: Java

2014-09-15 13:19:18

   过滤器为Mina之精髓,处IO层与业务层之间,起承上启下之用。可用下图形象表之:

一. 事件

  主要处理以下事件:

点击(此处)折叠或打开

  1.     //连接建立
  2.     void sessionCreated(IoSession session);
  3.     //sessionCreated:IO线程,做简单配置
  4.     //sessionOpened:可能其他线程
  5.     //连接建立,在TCP与sessionCreated无甚区别,只调用于sessionCreated后
  6.     void sessionOpened(IoSession session);
  7.     //连接断开
  8.     void sessionClosed(IoSession session);
  9.     //IDLE事件,在Processor中触发
  10.     void sessionIdle(IoSession session, IdleStatus status);
  11.     //异常
  12.     void exceptionCaught(IoSession session, Throwable cause);
  13.     //接收到消息
  14.     void messageReceived(IoSession session, Object message);
  15.     //消息write之后
  16.     void messageSent(IoSession session, WriteRequest writeRequest);
  17.     //消息write之前
  18.     void filterWrite(IoSession session, WriteRequest writeRequest);
  19.     //session.close被调用
  20.     void filterClose(IoSession session);

二. 接口与类

   Mina中对责任链模式的实现极复杂,涉及到以下接口与类:
1. IoFilter/IoFilterAdapter:实际处理事件的filter
2. NextFilter:仅用于转发到IoFilter
3. Entry/EntryImpl:包装
filter,在过滤器链中充当一个链表的节点
4. IoFilterChain/DefaultIoFilterChain:代表一个过滤器链(有Entry组成的链表
5. IoFilterChainBuilder/DefaultIoFilterChainBuilder:辅助创建过滤器链
   其内部维护一个有List<Entry>,在session被创建时调用buildFilterChain创建实际的链表。所以每个Session都持有一个IoFilterChain.

点击(此处)折叠或打开

  1. public void buildFilterChain(IoFilterChain chain) throws Exception {
  2.         for (Entry e : entries) {
  3.             chain.addLast(e.getName(), e.getFilter());
  4.         }
  5. }
6. HeadFilter/TailFilter:头尾过滤器

三. 实现

   跟踪messageReceived:
1. 当一个Session创建的时候,会生成一个默认的过滤器链

点击(此处)折叠或打开

  1. protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
  2.         super(service);
  3.         this.channel = channel;
  4.         this.processor = processor;
  5.         filterChain = new DefaultIoFilterChain(this);
  6. }
  7. public DefaultIoFilterChain(AbstractIoSession session) {
  8.         if (session == null) {
  9.             throw new IllegalArgumentException("session");
  10.         }

  11.         this.session = session;
  12.         head = new EntryImpl(null, null, "head", new HeadFilter());
  13.         tail = new EntryImpl(head, null, "tail", new TailFilter());
  14.         head.nextEntry = tail;
  15. }
此时,过滤器链如下图:

2. 加入编码过滤器ProtocolCodecFilter
第一步:

点击(此处)折叠或打开

  1. server.getFilterChain().addLast(
  2.                 "codec",
  3.                 new ProtocolCodecFilter(new EchoCodecFactory()));

  4. public synchronized void addLast(String name, IoFilter filter) {
  5.         register(entries.size(), new EntryImpl(name, filter));
  6. }

  7. private void register(int index, Entry e) {
  8.      if (contains(e.getName())) {
  9.         throw new IllegalArgumentException("Other filter is using the same name: " + e.getName());
  10.      }
  11.      //entries只是个List
  12.      entries.add(index, e);
  13. }
 第二步:上一步并没有把ProtocolCodecFilter加入到链中

点击(此处)折叠或打开

  1. private boolean addNow(S session) {
  2.             ...
  3.             IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
  4.             chainBuilder.buildFilterChain(session.getFilterChain());
  5.   
  6.             IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
  7.             listeners.fireSessionCreated(session);
  8.             ...
  9.     }

  10. public void buildFilterChain(IoFilterChain chain) throws Exception {
  11.         for (Entry e : entries) {
  12.             chain.addLast(e.getName(), e.getFilter());
  13.         }
  14.     }

  15. public synchronized void addLast(String name, IoFilter filter) {
  16.         checkAddable(name);
  17.         register(tail.prevEntry, name, filter);
  18.     }
  19. private void register(EntryImpl prevEntry, String name, IoFilter filter) {
  20.         EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter);

  21.         try {
  22.             filter.onPreAdd(this, name, newEntry.getNextFilter());
  23.         } catch (Exception e) {
  24.             throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':' + filter + " in " + getSession(), e);
  25.         }

  26.         prevEntry.nextEntry.prevEntry = newEntry;
  27.         prevEntry.nextEntry = newEntry;
  28.         name2entry.put(name, newEntry);

  29.         try {
  30.             filter.onPostAdd(this, name, newEntry.getNextFilter());
  31.         } catch (Exception e) {
  32.             deregister0(newEntry);
  33.             throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':' + filter + " in " + getSession(), e);
  34.         }
  35.     }
结果如图:

3. messageReceived触发

点击(此处)折叠或打开

  1. //Processor线程收到数据,触发OP_READ
  2.      //select返回调用process函数,最后到read(S)函数
  3.      private void read(S session) {
  4.          ...
  5.          if (readBytes > 0) {
  6.              IoFilterChain filterChain = session.getFilterChain();
  7.              filterChain.fireMessageReceived(buf);
  8.              ...
  9.          }
  10.          ...
  11.      }

  12. public void fireMessageReceived(Object message) {
  13.         if (message instanceof IoBuffer) {
  14.             session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
  15.         }
  16.         //取Head过滤器
  17.         Entry head = this.head;
  18.         callNextMessageReceived(head, session, message);
  19.     }
  20. private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
  21.         try {
  22.             IoFilter filter = entry.getFilter();
  23.             NextFilter nextFilter = entry.getNextFilter();
  24.             filter.messageReceived(nextFilter, session, message);
  25.         } catch (Throwable e) {
  26.             fireExceptionCaught(e);
  27.         }
  28.     }
  29. //head的messageReceived
  30. public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
  31.         //head只是简单转给nextFilter
  32.         nextFilter.messageReceived(session, message);
  33.     }
  34. //nextFilter里面的messageReceived
  35. public void messageReceived(IoSession session, Object message) {
  36.         //直接取下一个节点,这里head的下一个是编码过滤器
  37.         Entry nextEntry = EntryImpl.this.nextEntry;
  38.         callNextMessageReceived(nextEntry, session, message);
  39.     }
  40. ..如此循环,最后到了tail

  41. public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
  42.          AbstractIoSession s = (AbstractIoSession) session;
  43.          if (!(message instanceof IoBuffer)) {
  44.              s.increaseReadMessages(System.currentTimeMillis());
  45.          } else if (!((IoBuffer) message).hasRemaining()) {
  46.              s.increaseReadMessages(System.currentTimeMillis());
  47.          }

  48.          try {
  49.              //tail把事件转给Handle,过滤器链结束
  50.              session.getHandler().messageReceived(s, message);
  51.          } finally {
  52.              if (s.getConfig().isUseReadOperation()) {
  53.                  s.offerReadFuture(message);
  54.              }
  55.          }
  56.      }

阅读(1826) | 评论(0) | 转发(0) |
0

上一篇:10. 处理OP_READ

下一篇:12.mina-write

给主人留下些什么吧!~~