Write相对Read复杂的多,主要有两条线。
1. session.write 引发fireWrite事件,经过编码等过滤器的处理,最后将需要写的session写入到flushingSessions中,wakeup相应的Processor线程
2. 实际写操作有
Processor线程完成,写完后触发messageSent事件。
一. session.write
1. 生成一个WriteRequest,触发fireWrite
-
public WriteFuture write(Object message, SocketAddress remoteAddress) {
-
.....
-
-
// Now, we can write the message. First, create a future
-
WriteFuture writeFuture = new DefaultWriteFuture(this);
-
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
-
-
// Then, get the chain and inject the WriteRequest into it
-
IoFilterChain filterChain = getFilterChain();
-
filterChain.fireFilterWrite(writeRequest);
-
-
.....
-
-
// Return the WriteFuture.
-
return writeFuture;
-
}
假设过滤器链为:head、ProtocolCodecFilter、tail
-
//write是从tail一直prev到head的
-
public void fireFilterWrite(WriteRequest writeRequest) {
-
Entry tail = this.tail;
-
callPreviousFilterWrite(tail, session, writeRequest);
-
}
-
private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) {
-
try {
-
IoFilter filter = entry.getFilter();
-
NextFilter nextFilter = entry.getNextFilter();
-
filter.filterWrite(nextFilter, session, writeRequest);
-
} catch (Throwable e) {
-
writeRequest.getFuture().setException(e);
-
fireExceptionCaught(e);
-
}
-
}
-
//tail只是简单的转给下一个filter这里是ProtocolCodecFilter
-
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-
nextFilter.filterWrite(session, writeRequest);
-
}
ProtocolCodecFilter需要将业务对象编码成IoBuffer
-
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-
//取得业务对象
-
Object message = writeRequest.getMessage();
-
//如果已经是IoBuffer直接传给下一个filter,这里为head
-
if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
-
nextFilter.filterWrite(session, writeRequest);
-
return;
-
}
-
-
//获取编码器
-
ProtocolEncoder encoder = factory.getEncoder(session);
-
ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
-
-
if (encoder == null) {
-
throw new ProtocolEncoderException("The encoder is null for the session " + session);
-
}
-
-
if (encoderOut == null) {
-
throw new ProtocolEncoderException("The encoderOut is null for the session " + session);
-
}
-
-
try {
-
//编码
-
encoder.encode(session, message, encoderOut);
-
//获取编码结果
-
Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
-
-
//依次处理
-
while (!bufferQueue.isEmpty()) {
-
//编码后的对象
-
Object encodedMessage = bufferQueue.poll();
-
if (encodedMessage == null) {
-
break;
-
}
-
//传给下一个filter,这里为head
-
if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
-
SocketAddress destination = writeRequest.getDestination();
-
//WriteRequest打包成编码后的请求,这样后续filter拿到的数据则已经经过编码
-
WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
-
nextFilter.filterWrite(session, encodedWriteRequest);
-
}
-
}
-
//MessageWriteRequest返回的是空的IoBuffer,不会添加到实际写列表
-
//为什么要调用一次???
-
nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
-
} catch (Throwable t) {
-
ProtocolEncoderException pee;
-
// Generate the correct exception
-
if (t instanceof ProtocolEncoderException) {
-
pee = (ProtocolEncoderException) t;
-
} else {
-
pee = new ProtocolEncoderException(t);
-
}
-
-
throw pee;
-
}
-
}
此时到了head过滤器
-
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-
-
AbstractIoSession s = (AbstractIoSession) session;
-
//处理统计信息
-
if (writeRequest.getMessage() instanceof IoBuffer) {
-
IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
-
// I/O processor implementation will call buffer.reset()
-
// it after the write operation is finished, because
-
// the buffer will be specified with messageSent event.
-
buffer.mark();
-
int remaining = buffer.remaining();
-
if (remaining == 0) {
-
//发送消息数,前面的空IoBuffer可能就为此吧
-
s.increaseScheduledWriteMessages();
-
} else {
-
//发送byte数
-
s.increaseScheduledWriteBytes(remaining);
-
}
-
} else {
-
s.increaseScheduledWriteMessages();
-
}
-
-
WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();
-
//默认此路通
-
if (!s.isWriteSuspended()) {
-
//其实没什么区别write里面也会调用flush
-
if (writeRequestQueue.size() == 0) {
-
s.getProcessor().write(s, writeRequest);
-
} else {
-
s.getWriteRequestQueue().offer(s, writeRequest);
-
s.getProcessor().flush(s);
-
}
-
} else {
-
s.getWriteRequestQueue().offer(s, writeRequest);
-
}
-
}
上面就是把write请求offer到了writeRequestQueue队列,然后调用Processor的flush
-
public final void flush(S session) {
-
//session.setScheduledForFlush(true)用来保证同一个session只加入一次
-
if (session.setScheduledForFlush(true)) {
-
flushingSessions.add(session);
-
wakeup();
-
}
-
}
到此session.write处理完毕,总流程大致如下:
1. 将write请求包装成writeRequest,然后触发fireWrite
2. tail的fireWrite直接转给ProtocolCodecFilter的fireWrite
3. ProtocolCodecFilter编码后,将writeRequest包装成已编码的writeRequest,转给head。但它会多发一个IoBuffer为空的writeRequest用以统计发送消息数
4. head直接调用Processor的write,或将request写入WriteRequestQueue然后调用Processor的flush(前面的write做的也是这个事)
5. 唤醒Processor线程
二. Processor实际写
processor阻塞于select(),被手动wakeup(不是Op_write)后,会到flush(long)函数
-
private void flush(long currentTime) {
-
if (flushingSessions.isEmpty()) {
-
return;
-
}
-
-
do {
-
S session = flushingSessions.poll();
-
//为什么会null?
-
if (session == null) {
-
break;
-
}
-
-
//表示session又可以加入flushingSessions里了
-
session.unscheduledForFlush();
-
//取得session状态
-
SessionState state = getState(session);
-
switch (state) {
-
case OPENED:
-
try {
-
//处理掉
-
boolean flushedAll = flushNow(session, currentTime);
-
//处理掉了,不过又有新的请求了,但还没加入到flushingSessions
-
if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
-
&& !session.isScheduledForFlush()) {
-
//加入到flushingSessions
-
scheduleFlush(session);
-
}
-
} catch (Exception e) {
-
scheduleRemove(session);
-
IoFilterChain filterChain = session.getFilterChain();
-
filterChain.fireExceptionCaught(e);
-
}
-
-
break;
-
-
case CLOSING:
-
// Skip if the channel is already closed.
-
break;
-
-
case OPENING:
-
// Retry later if session is not yet fully initialized.
-
// (In case that Session.write() is called before addSession()
-
// is processed)
-
scheduleFlush(session);
-
return;
-
-
default:
-
throw new IllegalStateException(String.valueOf(state));
-
}
-
-
} while (!flushingSessions.isEmpty()); //一次性处理完
-
}
-
-
private boolean flushNow(S session, long currentTime) {
-
if (!session.isConnected()) {
-
scheduleRemove(session);
-
return false;
-
}
-
-
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
-
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
-
// Set limitation for the number of written bytes for read-write
-
// fairness. I used maxReadBufferSize * 3 / 2, which yields best
-
// performance in my experience while not breaking fairness much.
-
//一次最大能发送的byte数
-
final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
-
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
-
int writtenBytes = 0;
-
WriteRequest req = null;
-
try {
-
// Clear OP_WRITE
-
setInterestedInWrite(session, false);
-
do {
-
//取得当前正在处理的写请求,因为写不成功,需要知道上一次写的req
-
req = session.getCurrentWriteRequest();
-
if (req == null) {
-
req = writeRequestQueue.poll(session);
-
if (req == null) {
-
break;
-
}
-
session.setCurrentWriteRequest(req);
-
}
-
//实际写了多少
-
int localWrittenBytes = 0;
-
Object message = req.getMessage();
-
if (message instanceof IoBuffer) {
-
//调用writeBuffer进行写
-
localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
-
currentTime);
-
//写了一点,还没写完,可能写缓冲区满了
-
if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
-
// the buffer isn't empty, we re-interest it in writing
-
writtenBytes += localWrittenBytes;
-
//注册OP_WRITE,然后直接返回,等待下次继续
-
//注册了op_write会唤醒select,process函数会将session添加到flushingSessions
-
//这也是需要设置当前req的原因
-
setInterestedInWrite(session, true);
-
return false;
-
}
-
} else if (message instanceof FileRegion) {
-
localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
-
currentTime);
-
-
// Fix for Java bug on Linux
-
//
-
// If there's still data to be written in the FileRegion,
-
// return 0 indicating that we need
-
// to pause until writing may resume.
-
if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
-
writtenBytes += localWrittenBytes;
-
setInterestedInWrite(session, true);
-
return false;
-
}
-
} else {
-
throw new IllegalStateException("Don't know how to handle message of type '"
-
+ message.getClass().getName() + "'. Are you missing a protocol encoder?");
-
}
-
-
if (localWrittenBytes == 0) {
-
//一点都没写出去,写缓冲区已满,注册op_write下次继续
-
//注册了op_write会唤醒select,process函数会将session添加到flushingSessions
-
setInterestedInWrite(session, true);
-
return false;
-
}
-
-
writtenBytes += localWrittenBytes;
-
-
if (writtenBytes >= maxWrittenBytes) {
-
//发送数据量太大,下次处理
-
scheduleFlush(session);
-
return false;
-
}
-
} while (writtenBytes < maxWrittenBytes);
-
} catch (Exception e) {
-
if (req != null) {
-
req.getFuture().setException(e);
-
}
-
-
IoFilterChain filterChain = session.getFilterChain();
-
filterChain.fireExceptionCaught(e);
-
return false;
-
}
-
-
return true;
-
}
-
-
private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
-
throws Exception {
-
IoBuffer buf = (IoBuffer) req.getMessage();
-
int localWrittenBytes = 0;
-
//实际写处理
-
if (buf.hasRemaining()) {
-
int length;
-
-
if (hasFragmentation) {
-
length = Math.min(buf.remaining(), maxLength);
-
} else {
-
length = buf.remaining();
-
}
-
-
try {
-
localWrittenBytes = write(session, buf, length);
-
} catch (IOException ioe) {
-
// We have had an issue while trying to send data to the
-
// peer : let's close the session.
-
session.close(true);
-
}
-
-
}
-
//统计信息
-
session.increaseWrittenBytes(localWrittenBytes, currentTime);
-
//写完
-
if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
-
// Buffer has been sent, clear the current request.
-
int pos = buf.position();
-
buf.reset();
-
//触发MessageSent事件
-
fireMessageSent(session, req);
-
// And set it back to its position
-
buf.position(pos);
-
}
-
-
return localWrittenBytes;
-
}
三. MessageSend事件
一次写操作,该事件会被触发两次。上面提到ProtocolCodecFilter在filterWrite会转发两个filterWrite:
1. WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
2. nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
因为这两个都写入WriteRequestQueue,所以在Processor线程中也触发了两次。但
MessageWriteRequest实际是空IoBuffer,所以也不会有实际的写操作。
MessageSend需要原始的写对象,所以encodedWriteRequest 的MessageSend事件在head中会被中断。实际触发该事件的是MessageWriteRequest。至于为什么不在encodedWriteRequest 携带原始写对象不得而知。
-
private void fireMessageSent(S session, WriteRequest req) {
-
session.setCurrentWriteRequest(null);
-
IoFilterChain filterChain = session.getFilterChain();
-
filterChain.fireMessageSent(req);
-
}
-
public void fireMessageSent(WriteRequest request) {
-
session.increaseWrittenMessages(request, System.currentTimeMillis());
-
-
try {
-
request.getFuture().setWritten();
-
} catch (Throwable t) {
-
fireExceptionCaught(t);
-
}
-
-
Entry head = this.head;
-
//就是这里,encodedWriteRequest被中断
-
if (!request.isEncoded()) {
-
callNextMessageSent(head, session, request);
-
}
-
}
-
-
private void callNextMessageSent(Entry entry, IoSession session, WriteRequest writeRequest) {
-
try {
-
IoFilter filter = entry.getFilter();
-
NextFilter nextFilter = entry.getNextFilter();
-
filter.messageSent(nextFilter, session, writeRequest);
-
} catch (Throwable e) {
-
fireExceptionCaught(e);
-
}
-
}
-
//head的messageSent只是转发到ProtocolCodecFilter
-
//此为ProtocolCodecFilter的,转给tail
-
public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-
if (writeRequest instanceof EncodedWriteRequest) {
-
return;
-
}
-
-
if (writeRequest instanceof MessageWriteRequest) {
-
MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
-
nextFilter.messageSent(session, wrappedRequest.getParentRequest());
-
} else {
-
nextFilter.messageSent(session, writeRequest);
-
}
-
}
-
//此为tail, 转给handle,结束
-
public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-
session.getHandler().messageSent(session, writeRequest.getMessage());
-
}
阅读(1820) | 评论(0) | 转发(0) |