总的来说,在Acceptor线程的Selector收到OP_ACCEPT事件后,会创建一个NioSession,一个Session表示一个连接。之后它会选择一个NioProcessor处理读写事件。
NioProcessor可表示一个IO处理机,用于处理read跟write事件。Mina启动时会创建一个包含ncpus+1个NioProcessor对象的对象池,每个NioProcessor会启动一个Processor线程来处理读写事件。
1. Acceptor线程中处理OP_ACCEPT:
-
private void processHandles(Iterator<H> handles) throws Exception {
-
while (handles.hasNext()) {
-
H handle = handles.next();
-
handles.remove();
-
//创建一个Session, processor是一个对象池
-
S session = accept(processor, handle);
-
if (session == null) {
-
continue;
-
}
-
initSession(session, null, null);
-
//为该session选择一个NioSession
-
session.getProcessor().add(session);
-
}
-
}
2. 取得对应的NioProcessor
-
private IoProcessor<S> getProcessor(S session) {
-
//取得对应的NioProcessor
-
IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
-
//还没设置
-
if (processor == null) {
-
if (disposed || disposing) {
-
throw new IllegalStateException("A disposed processor cannot be accessed.");
-
}
-
//取余取得对应的NioProcessor
-
processor = pool[Math.abs((int) session.getId()) % pool.length];
-
-
if (processor == null) {
-
throw new IllegalStateException("A disposed processor cannot be accessed.");
-
}
-
session.setAttributeIfAbsent(PROCESSOR, processor);
-
return processor;
-
}
3. Processor线程
-
private class Processor implements Runnable {
-
public void run() {
-
assert (processorRef.get() == this);
-
int nSessions = 0;
-
//用于处理Idle事情
-
lastIdleCheckTime = System.currentTimeMillis();
-
for (;;) {
-
try {
-
//调用有超时select,用于处理Idle事情。因为idle事件select无法触发
-
long t0 = System.currentTimeMillis();
-
int selected = select(SELECT_TIMEOUT);
-
long t1 = System.currentTimeMillis();
-
long delta = (t1 - t0);
-
//select奇怪的返回了(非事件触发,非其他线程wakeup,非超时)
-
if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
-
//客服端断开了,服务断channel未取消导致select返回
-
//isBrokenConnection取消掉这些channel
-
if (isBrokenConnection()) {
-
LOG.warn("Broken connection");
-
wakeupCalled.getAndSet(false);
-
continue;
-
} else {
-
LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
-
// Ok, we are hit by the nasty epoll
-
// spinning.
-
// Basically, there is a race condition
-
// which causes a closing file descriptor not to be
-
// considered as available as a selected channel, but
-
// it stopped the select. The next time we will
-
// call select(), it will exit immediately for the same
-
// reason, and do so forever, consuming 100%
-
// CPU.
-
// We have to destroy the selector, and
-
// register all the socket on a new one.
-
-
//nio的一个bug.还是无故返回,重新注册一个selecotr
-
registerNewSelector();
-
}
-
wakeupCalled.getAndSet(false);
-
continue;
-
}
-
-
//处理op_accept
-
nSessions += handleNewSessions();
-
//拥塞控制
-
updateTrafficMask();
-
//处理读写事件
-
if (selected > 0) {
-
process();
-
}
-
-
// Write the pending requests
-
long currentTime = System.currentTimeMillis();
-
flush(currentTime);
-
-
//删除断开的连接
-
nSessions -= removeSessions();
-
//触发IDLE事件
-
notifyIdleSessions(currentTime);
-
//没有任何连接,退出线程
-
if (nSessions == 0) {
-
processorRef.set(null);
-
-
if (newSessions.isEmpty() && isSelectorEmpty()) {
-
// newSessions.add() precedes startupProcessor
-
assert (processorRef.get() != this);
-
break;
-
}
-
-
assert (processorRef.get() != this);
-
-
if (!processorRef.compareAndSet(null, this)) {
-
// startupProcessor won race, so must exit processor
-
assert (processorRef.get() != this);
-
break;
-
}
-
-
assert (processorRef.get() == this);
-
}
-
-
// Disconnect all sessions immediately if disposal has been
-
// requested so that we exit this loop eventually.
-
if (isDisposing()) {
-
for (Iterator<S> i = allSessions(); i.hasNext();) {
-
scheduleRemove(i.next());
-
}
-
-
wakeup();
-
}
-
} catch (ClosedSelectorException cse) {
-
// If the selector has been closed, we can exit the loop
-
break;
-
} catch (Throwable t) {
-
ExceptionMonitor.getInstance().exceptionCaught(t);
-
-
try {
-
Thread.sleep(1000);
-
} catch (InterruptedException e1) {
-
ExceptionMonitor.getInstance().exceptionCaught(e1);
-
}
-
}
-
}
-
-
try {
-
synchronized (disposalLock) {
-
if (disposing) {
-
doDispose();
-
}
-
}
-
} catch (Throwable t) {
-
ExceptionMonitor.getInstance().exceptionCaught(t);
-
} finally {
-
disposalFuture.setValue(true);
-
}
-
}
-
}
4. 注册OP_READ到selector
-
private int handleNewSessions() {
-
int addedSessions = 0;
-
-
for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
-
if (addNow(session)) {
-
// A new session has been created
-
addedSessions++;
-
}
-
}
-
-
return addedSessions;
-
}
-
-
private boolean addNow(S session) {
-
boolean registered = false;
-
-
try {
-
init(session);
-
registered = true;
-
-
// Build the filter chain of this session.
-
IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
-
chainBuilder.buildFilterChain(session.getFilterChain());
-
-
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
-
// in AbstractIoFilterChain.fireSessionOpened().
-
// Propagate the SESSION_CREATED event up to the chain
-
IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
-
listeners.fireSessionCreated(session);
-
} catch (Throwable e) {
-
ExceptionMonitor.getInstance().exceptionCaught(e);
-
-
try {
-
destroy(session);
-
} catch (Exception e1) {
-
ExceptionMonitor.getInstance().exceptionCaught(e1);
-
} finally {
-
registered = false;
-
}
-
}
-
-
return registered;
-
}
-
-
protected void init(NioSession session) throws Exception {
-
SelectableChannel ch = (SelectableChannel) session.getChannel();
-
ch.configureBlocking(false);
-
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
-
}
Select一直返回:
1. 注册了op_write事件,而处理完后未取消,可能一直触发op_write
2. 一方断开连接后,另外一方会收到OP_READ并且read返回-1,此时需要把channel显示close掉。否则会一直返回OP_READ
3.上面提到的bug, linux下。可升级jdk
4....
阅读(1235) | 评论(0) | 转发(0) |