过滤器为Mina之精髓,处IO层与业务层之间,起承上启下之用。可用下图形象表之:
一. 事件
主要处理以下事件:
-
//连接建立
-
void sessionCreated(IoSession session);
-
//sessionCreated:IO线程,做简单配置
-
//sessionOpened:可能其他线程
-
//连接建立,在TCP与sessionCreated无甚区别,只调用于sessionCreated后
-
void sessionOpened(IoSession session);
-
//连接断开
-
void sessionClosed(IoSession session);
-
//IDLE事件,在Processor中触发
-
void sessionIdle(IoSession session, IdleStatus status);
-
//异常
-
void exceptionCaught(IoSession session, Throwable cause);
-
//接收到消息
-
void messageReceived(IoSession session, Object message);
-
//消息write之后
-
void messageSent(IoSession session, WriteRequest writeRequest);
-
//消息write之前
-
void filterWrite(IoSession session, WriteRequest writeRequest);
-
//session.close被调用
-
void filterClose(IoSession session);
二. 接口与类
Mina中对责任链模式的实现极复杂,涉及到以下接口与类:
1. IoFilter/IoFilterAdapter:实际处理事件的filter
2. NextFilter:仅用于转发到
IoFilter
3. Entry/EntryImpl:包装filter,在过滤器链中充当一个链表的节点
4. IoFilterChain/DefaultIoFilterChain:代表一个
过滤器链(有Entry组成的链表)
5. IoFilterChainBuilder/DefaultIoFilterChainBuilder:辅助创建过滤器链
其内部维护一个有List<
Entry>,在session被创建时调用buildFilterChain创建实际的链表。所以每个Session都持有一个IoFilterChain.
-
public void buildFilterChain(IoFilterChain chain) throws Exception {
-
for (Entry e : entries) {
-
chain.addLast(e.getName(), e.getFilter());
-
}
-
}
6. HeadFilter/TailFilter:头尾过滤器
三. 实现
跟踪messageReceived:
1. 当一个Session创建的时候,会生成一个默认的过滤器链
-
protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
-
super(service);
-
this.channel = channel;
-
this.processor = processor;
-
filterChain = new DefaultIoFilterChain(this);
-
}
-
public DefaultIoFilterChain(AbstractIoSession session) {
-
if (session == null) {
-
throw new IllegalArgumentException("session");
-
}
-
-
this.session = session;
-
head = new EntryImpl(null, null, "head", new HeadFilter());
-
tail = new EntryImpl(head, null, "tail", new TailFilter());
-
head.nextEntry = tail;
-
}
此时,过滤器链如下图:
2. 加入编码过滤器ProtocolCodecFilter
第一步:
-
server.getFilterChain().addLast(
-
"codec",
-
new ProtocolCodecFilter(new EchoCodecFactory()));
-
-
public synchronized void addLast(String name, IoFilter filter) {
-
register(entries.size(), new EntryImpl(name, filter));
-
}
-
-
private void register(int index, Entry e) {
-
if (contains(e.getName())) {
-
throw new IllegalArgumentException("Other filter is using the same name: " + e.getName());
-
}
-
//entries只是个List
-
entries.add(index, e);
-
}
第二步:上一步并没有把ProtocolCodecFilter加入到链中
-
private boolean addNow(S session) {
-
...
-
IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
-
chainBuilder.buildFilterChain(session.getFilterChain());
-
-
IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
-
listeners.fireSessionCreated(session);
-
...
-
}
-
-
public void buildFilterChain(IoFilterChain chain) throws Exception {
-
for (Entry e : entries) {
-
chain.addLast(e.getName(), e.getFilter());
-
}
-
}
-
-
public synchronized void addLast(String name, IoFilter filter) {
-
checkAddable(name);
-
register(tail.prevEntry, name, filter);
-
}
-
private void register(EntryImpl prevEntry, String name, IoFilter filter) {
-
EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter);
-
-
try {
-
filter.onPreAdd(this, name, newEntry.getNextFilter());
-
} catch (Exception e) {
-
throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':' + filter + " in " + getSession(), e);
-
}
-
-
prevEntry.nextEntry.prevEntry = newEntry;
-
prevEntry.nextEntry = newEntry;
-
name2entry.put(name, newEntry);
-
-
try {
-
filter.onPostAdd(this, name, newEntry.getNextFilter());
-
} catch (Exception e) {
-
deregister0(newEntry);
-
throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':' + filter + " in " + getSession(), e);
-
}
-
}
结果如图:
3. messageReceived触发
-
//Processor线程收到数据,触发OP_READ
-
//select返回调用process函数,最后到read(S)函数
-
private void read(S session) {
-
...
-
if (readBytes > 0) {
-
IoFilterChain filterChain = session.getFilterChain();
-
filterChain.fireMessageReceived(buf);
-
...
-
}
-
...
-
}
-
-
public void fireMessageReceived(Object message) {
-
if (message instanceof IoBuffer) {
-
session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
-
}
-
//取Head过滤器
-
Entry head = this.head;
-
callNextMessageReceived(head, session, message);
-
}
-
private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
-
try {
-
IoFilter filter = entry.getFilter();
-
NextFilter nextFilter = entry.getNextFilter();
-
filter.messageReceived(nextFilter, session, message);
-
} catch (Throwable e) {
-
fireExceptionCaught(e);
-
}
-
}
-
//head的messageReceived
-
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
-
//head只是简单转给nextFilter
-
nextFilter.messageReceived(session, message);
-
}
-
//nextFilter里面的messageReceived
-
public void messageReceived(IoSession session, Object message) {
-
//直接取下一个节点,这里head的下一个是编码过滤器
-
Entry nextEntry = EntryImpl.this.nextEntry;
-
callNextMessageReceived(nextEntry, session, message);
-
}
-
..如此循环,最后到了tail
-
-
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
-
AbstractIoSession s = (AbstractIoSession) session;
-
if (!(message instanceof IoBuffer)) {
-
s.increaseReadMessages(System.currentTimeMillis());
-
} else if (!((IoBuffer) message).hasRemaining()) {
-
s.increaseReadMessages(System.currentTimeMillis());
-
}
-
-
try {
-
//tail把事件转给Handle,过滤器链结束
-
session.getHandler().messageReceived(s, message);
-
} finally {
-
if (s.getConfig().isUseReadOperation()) {
-
s.offerReadFuture(message);
-
}
-
}
-
}
阅读(1915) | 评论(0) | 转发(0) |