在NIO中,通过serversocket的bind函数将channel与socket地址绑定,最后将该channel的op_accept注册到selector中,算是可以接受客户端的连接了。在MINA中,对
op_accept事件使用单独一个selector,并可以监听多个channle的
op_accept事件。对已经绑定过的SocketAddress,MINA并不检查,若已经绑定过,则会抛出已绑定的异常,但并不影响其他的SocketAddress。
MINA中开启监听分为两步,分别在不同的线程完成。
1. bind()函数所在线程只是简单的将SocketAddress地址列表扔进AcceptorOperationFuture(表示一个开启监听的请求)队列,然后启动Acceptor线程。
2. Acceptor线程会不断的从AcceptorOperationFuture队列中poll一个AcceptorOperationFuture,创建实际的channel,并注册op_accept事件。
一. bind
bind函数首先会判断一下handle是否为空,如果为空则抛出异常。接着通用bindInternal将请求打包成AcceptorOperationFuture,然后将监听成功的SocketAddress插入boundAddresses集合里面,最后如果是第一次启动监听则触发ServiceActivated事件。
-
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
-
//1.打包成开启监听请求
-
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
-
//2.插入注册监听队列
-
registerQueue.add(request);
-
//3.启动Acceptor线程,实际启动监听的线程
-
startupAcceptor();
-
try {
-
//lock为值为1的信号量,锁住先等本次绑定请求处理完
-
lock.acquire();
-
//Acceptor线程可能初次启动,等10ms待启动完成
-
Thread.sleep(10);
-
//Acceptor线程一启动就阻塞于select函数(op_accept),需要客户端连接或
-
//手动唤醒。绑定监听需要手动唤醒让其处理绑定请求
-
wakeup();
-
} finally {
-
lock.release();
-
}
-
-
//future模式,等待accpetor线程对当前AcceptorOperationFuture请求处理完成
-
request.awaitUninterruptibly();
-
-
if (request.getException() != null) {
-
throw request.getException();
-
}
-
-
-
Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
-
//acceptor将成功的结果写到boundHandles里
-
for (H handle : boundHandles.values()) {
-
newLocalAddresses.add(localAddress(handle));
-
}
-
-
return newLocalAddresses;
-
}
-
-
private void startupAcceptor() throws InterruptedException {
-
//selectable在构造函数中被设为true
-
if (!selectable) {
-
registerQueue.clear();
-
cancelQueue.clear();
-
}
-
-
//取得Acceptor线程引用,acceptorRef为原子引用,利用CAS保证同步
-
Acceptor acceptor = acceptorRef.get();
-
-
if (acceptor == null) {
-
//锁住,保证只有一个Acceptor线程
-
lock.acquire();
-
acceptor = new Acceptor();
-
-
if (acceptorRef.compareAndSet(null, acceptor)) {
-
//这里表示Acceptor线程第一次启动,将lock的释放放入Acceptor线程里面
-
executeWorker(acceptor);
-
} else {
-
//在试图启动过程中,另外一个线程已经启动Acceptor。则Acceptor线程将不会执行release操作
-
//所以在这里释放锁
-
lock.release();
-
}
-
}
-
}
二. Acceptor线程
该线程的功能是绑定监听,处理op_accept请求,解除绑定。
整个流程:
-
public void run() {
-
assert (acceptorRef.get() == this);
-
-
int nHandles = 0;
-
-
//第一次启动,因为startupAcceptor函数的原因在这里释放锁
-
lock.release();
-
//selectable见上
-
while (selectable) {
-
try {
-
//阻塞于OP_ACCEPT,该事件在下面的registerHandles的注册
-
//这个函数会阻塞,所以绑定监听需要手动wakeup
-
int selected = select();
-
//处理监听请求,注册OP_ACCEPT,注意这里是+=表示累积的监听数
-
nHandles += registerHandles();
-
//没有任何socket在监听
-
if (nHandles == 0) {
-
//企图终止线程,将acceptorRef设为null
-
acceptorRef.set(null);
-
//若bind请求跟unbind请求为空,终止线程
-
if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
-
assert (acceptorRef.get() != this);
-
break;
-
}
-
//否则恢复线程,恢复过程中可能重新启动了一个Acceptor线程
-
if (!acceptorRef.compareAndSet(null, this)) {
-
assert (acceptorRef.get() != this);
-
break;
-
}
-
assert (acceptorRef.get() == this);
-
}
-
-
if (selected > 0) {
-
//处理OP_ACCEPT
-
processHandles(selectedHandles());
-
}
-
//处理unbind
-
nHandles -= unregisterHandles();
-
} catch (ClosedSelectorException cse) {
-
// If the selector has been closed, we can exit the loop
-
break;
-
} catch (Throwable e) {
-
ExceptionMonitor.getInstance().exceptionCaught(e);
-
-
try {
-
//为什么?
-
Thread.sleep(1000);
-
} catch (InterruptedException e1) {
-
ExceptionMonitor.getInstance().exceptionCaught(e1);
-
}
-
}
-
}
-
-
//处理应用关闭事件
-
if (selectable && isDisposing()) {
-
selectable = false;
-
try {
-
if (createdProcessor) {
-
processor.dispose();
-
}
-
} finally {
-
try {
-
synchronized (disposalLock) {
-
if (isDisposing()) {
-
destroy();
-
}
-
}
-
} catch (Exception e) {
-
ExceptionMonitor.getInstance().exceptionCaught(e);
-
} finally {
-
disposalFuture.setDone();
-
}
-
}
-
}
-
}
bind事件:
-
private int registerHandles() {
-
//无限循环,有两个退出口
-
for (;;) {
-
//非阻塞取得bind请求
-
AcceptorOperationFuture future = registerQueue.poll();
-
//没请求,返回0退出
-
if (future == null) {
-
return 0;
-
}
-
-
Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
-
List<SocketAddress> localAddresses = future.getLocalAddresses();
-
-
try {
-
for (SocketAddress a : localAddresses) {
-
//创建channel,绑定op_accept(open函数)
-
H handle = open(a);
-
newHandles.put(localAddress(handle), handle);
-
}
-
//boundHandles表示全局的已bind的socket地址
-
boundHandles.putAll(newHandles);
-
//future模式,设置请求处理完成
-
future.setDone();
-
//返回结果
-
return newHandles.size();
-
} catch (Exception e) {
-
//异常有open触发,保存下来
-
future.setException(e);
-
} finally {
-
//原子操作,调用的是channel的close.就算调用socket.close也不会直接关闭
-
if (future.getException() != null) {
-
for (H handle : newHandles.values()) {
-
try {
-
close(handle);
-
} catch (Exception e) {
-
ExceptionMonitor.getInstance().exceptionCaught(e);
-
}
-
}
-
-
// 唤醒什么??
-
wakeup();
-
}
-
}
-
}
-
}
unbind:
-
private int unregisterHandles() {
-
int cancelledHandles = 0;
-
for (;;) {
-
AcceptorOperationFuture future = cancelQueue.poll();
-
if (future == null) {
-
break;
-
}
-
-
// close the channels
-
for (SocketAddress a : future.getLocalAddresses()) {
-
H handle = boundHandles.remove(a);
-
-
if (handle == null) {
-
continue;
-
}
-
-
try {
-
close(handle);
-
wakeup(); // wake up again to trigger thread death
-
} catch (Throwable e) {
-
ExceptionMonitor.getInstance().exceptionCaught(e);
-
} finally {
-
cancelledHandles++;
-
}
-
}
-
-
future.setDone();
-
}
-
-
return cancelledHandles;
-
}
阅读(1273) | 评论(0) | 转发(0) |