Chinaunix首页 | 论坛 | 博客
  • 博客访问: 83122
  • 博文数量: 31
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 340
  • 用 户 组: 普通用户
  • 注册时间: 2013-04-02 20:25
文章分类

全部博文(31)

文章存档

2015年(2)

2014年(29)

我的朋友

分类: Java

2014-09-15 17:33:27

  Write相对Read复杂的多,主要有两条线。
1. session.write 引发fireWrite事件,经过编码等过滤器的处理,最后将需要写的session写入到flushingSessions中,wakeup相应的Processor线程
2. 实际写操作有Processor线程完成,写完后触发messageSent事件。

一. session.write 

1. 生成一个WriteRequest,触发fireWrite

点击(此处)折叠或打开

  1. public WriteFuture write(Object message, SocketAddress remoteAddress) {
  2.         .....
  3.         
  4.         // Now, we can write the message. First, create a future
  5.         WriteFuture writeFuture = new DefaultWriteFuture(this);
  6.         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);

  7.         // Then, get the chain and inject the WriteRequest into it
  8.         IoFilterChain filterChain = getFilterChain();
  9.         filterChain.fireFilterWrite(writeRequest);

  10.         .....
  11.         
  12.         // Return the WriteFuture.
  13.         return writeFuture;
  14.     }
假设过滤器链为:head、ProtocolCodecFilter、tail

点击(此处)折叠或打开

  1. //write是从tail一直prev到head的
  2. public void fireFilterWrite(WriteRequest writeRequest) {
  3.         Entry tail = this.tail;
  4.         callPreviousFilterWrite(tail, session, writeRequest);
  5.     }
  6. private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) {
  7.         try {
  8.             IoFilter filter = entry.getFilter();
  9.             NextFilter nextFilter = entry.getNextFilter();
  10.             filter.filterWrite(nextFilter, session, writeRequest);
  11.         } catch (Throwable e) {
  12.             writeRequest.getFuture().setException(e);
  13.             fireExceptionCaught(e);
  14.         }
  15.  }
  16. //tail只是简单的转给下一个filter这里是ProtocolCodecFilter
  17.  public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
  18.      nextFilter.filterWrite(session, writeRequest);
  19.  }
ProtocolCodecFilter需要将业务对象编码成IoBuffer

点击(此处)折叠或打开

  1. public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
  2.      //取得业务对象
  3.      Object message = writeRequest.getMessage();
  4.      //如果已经是IoBuffer直接传给下一个filter,这里为head
  5.      if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
  6.           nextFilter.filterWrite(session, writeRequest);
  7.           return;
  8.      }

  9.      //获取编码器
  10.      ProtocolEncoder encoder = factory.getEncoder(session);
  11.      ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);

  12.      if (encoder == null) {
  13.           throw new ProtocolEncoderException("The encoder is null for the session " + session);
  14.      }

  15.      if (encoderOut == null) {
  16.           throw new ProtocolEncoderException("The encoderOut is null for the session " + session);
  17.      }

  18.      try {
  19.          //编码
  20.          encoder.encode(session, message, encoderOut);
  21.          //获取编码结果
  22.          Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();

  23.          //依次处理
  24.          while (!bufferQueue.isEmpty()) {
  25.          //编码后的对象
  26.          Object encodedMessage = bufferQueue.poll();
  27.          if (encodedMessage == null) {
  28.               break;
  29.          }
  30.          //传给下一个filter,这里为head
  31.          if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
  32.               SocketAddress destination = writeRequest.getDestination();
  33.               //WriteRequest打包成编码后的请求,这样后续filter拿到的数据则已经经过编码
  34.                WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
  35.                nextFilter.filterWrite(session, encodedWriteRequest);
  36.          }
  37.      }
  38.        //MessageWriteRequest返回的是空的IoBuffer,不会添加到实际写列表
  39.          //为什么要调用一次???
  40.           nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
  41.      } catch (Throwable t) {
  42.          ProtocolEncoderException pee;
  43.            // Generate the correct exception
  44.          if (t instanceof ProtocolEncoderException) {
  45.               pee = (ProtocolEncoderException) t;
  46.          else {
  47.                pee = new ProtocolEncoderException(t);
  48.          }

  49.          throw pee;
  50.          }
  51.      }
此时到了head过滤器

点击(此处)折叠或打开

  1. public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {

  2.            AbstractIoSession s = (AbstractIoSession) session;
  3.            //处理统计信息
  4.            if (writeRequest.getMessage() instanceof IoBuffer) {
  5.                IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
  6.                // I/O processor implementation will call buffer.reset()
  7.                // it after the write operation is finished, because
  8.                // the buffer will be specified with messageSent event.
  9.                buffer.mark();
  10.                int remaining = buffer.remaining();
  11.                if (remaining == 0) {
  12.                    //发送消息数,前面的空IoBuffer可能就为此吧
  13.                    s.increaseScheduledWriteMessages();
  14.                } else {
  15.                  //发送byte数
  16.                    s.increaseScheduledWriteBytes(remaining);
  17.                }
  18.            } else {
  19.                s.increaseScheduledWriteMessages();
  20.            }

  21.            WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();
  22.            //默认此路通
  23.            if (!s.isWriteSuspended()) {    
  24.              //其实没什么区别write里面也会调用flush
  25.                if (writeRequestQueue.size() == 0) {
  26.                    s.getProcessor().write(s, writeRequest);
  27.                } else {
  28.                    s.getWriteRequestQueue().offer(s, writeRequest);
  29.                    s.getProcessor().flush(s);
  30.                }
  31.            } else {
  32.                s.getWriteRequestQueue().offer(s, writeRequest);
  33.            }
  34.        }
上面就是把write请求offer到了writeRequestQueue队列,然后调用Processor的flush

点击(此处)折叠或打开

  1. public final void flush(S session) {
  2.         //session.setScheduledForFlush(true)用来保证同一个session只加入一次
  3.         if (session.setScheduledForFlush(true)) {
  4.             flushingSessions.add(session);
  5.             wakeup();
  6.         }
  7.     }
到此session.write处理完毕,总流程大致如下:
1. 将write请求包装成writeRequest,然后触发fireWrite
2. tail的fireWrite直接转给ProtocolCodecFilterfireWrite
3. 
ProtocolCodecFilter编码后,将writeRequest包装成已编码的writeRequest,转给head。但它会多发一个IoBuffer为空的writeRequest用以统计发送消息数
4. head直接调用Processor的write,或将request写入WriteRequestQueue然后调用Processor的flush(前面的write做的也是这个事)
5. 唤醒Processor线程

二. Processor实际写

  processor阻塞于select(),被手动wakeup(不是Op_write)后,会到flush(long)函数

点击(此处)折叠或打开

  1. private void flush(long currentTime) {
  2.      if (flushingSessions.isEmpty()) {
  3.           return;
  4.      }

  5.      do {
  6.           S session = flushingSessions.poll();
  7.           //为什么会null?
  8.           if (session == null) {
  9.               break;
  10.           }

  11.           //表示session又可以加入flushingSessions里了
  12.           session.unscheduledForFlush();
  13.           //取得session状态
  14.           SessionState state = getState(session);
  15.           switch (state) {
  16.           case OPENED:
  17.               try {
  18.                   //处理掉
  19.                   boolean flushedAll = flushNow(session, currentTime);
  20.                   //处理掉了,不过又有新的请求了,但还没加入到flushingSessions
  21.                   if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
  22.                           && !session.isScheduledForFlush()) {
  23.                          //加入到flushingSessions
  24.                           scheduleFlush(session);
  25.                   }
  26.               } catch (Exception e) {
  27.                   scheduleRemove(session);
  28.                   IoFilterChain filterChain = session.getFilterChain();
  29.                   filterChain.fireExceptionCaught(e);
  30.           }

  31.                break;

  32.           case CLOSING:
  33.                // Skip if the channel is already closed.
  34.               break;

  35.           case OPENING:
  36.               // Retry later if session is not yet fully initialized.
  37.               // (In case that Session.write() is called before addSession()
  38.               // is processed)
  39.                scheduleFlush(session);
  40.                return;

  41.           default:
  42.               throw new IllegalStateException(String.valueOf(state));
  43.            }

  44.          } while (!flushingSessions.isEmpty()); //一次性处理完
  45.      }

  46.     private boolean flushNow(S session, long currentTime) {
  47.         if (!session.isConnected()) {
  48.             scheduleRemove(session);
  49.             return false;
  50.         }

  51.         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
  52.         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();

  53.         // Set limitation for the number of written bytes for read-write
  54.         // fairness. I used maxReadBufferSize * 3 / 2, which yields best
  55.         // performance in my experience while not breaking fairness much.
  56.         //一次最大能发送的byte数
  57.         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
  58.                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
  59.         int writtenBytes = 0;
  60.         WriteRequest req = null;
  61.         try {
  62.             // Clear OP_WRITE
  63.             setInterestedInWrite(session, false);
  64.             do {
  65.                 //取得当前正在处理的写请求,因为写不成功,需要知道上一次写的req
  66.                 req = session.getCurrentWriteRequest();
  67.                 if (req == null) {
  68.                     req = writeRequestQueue.poll(session);
  69.                     if (req == null) {
  70.                         break;
  71.                     }
  72.                     session.setCurrentWriteRequest(req);
  73.                 }
  74.                 //实际写了多少
  75.                 int localWrittenBytes = 0;
  76.                 Object message = req.getMessage();
  77.                 if (message instanceof IoBuffer) {
  78.                     //调用writeBuffer进行写
  79.                     localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
  80.                             currentTime);
  81.                     //写了一点,还没写完,可能写缓冲区满了
  82.                     if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
  83.                         // the buffer isn't empty, we re-interest it in writing
  84.                         writtenBytes += localWrittenBytes;
  85.                         //注册OP_WRITE,然后直接返回,等待下次继续
  86.                         //注册了op_write会唤醒select,process函数会将session添加到flushingSessions
  87.                         //这也是需要设置当前req的原因
  88.                         setInterestedInWrite(session, true);
  89.                         return false;
  90.                     }
  91.                 } else if (message instanceof FileRegion) {
  92.                     localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
  93.                             currentTime);

  94.                     // Fix for Java bug on Linux
  95.                     //
  96.                     // If there's still data to be written in the FileRegion,
  97.                     // return 0 indicating that we need
  98.                     // to pause until writing may resume.
  99.                     if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
  100.                         writtenBytes += localWrittenBytes;
  101.                         setInterestedInWrite(session, true);
  102.                         return false;
  103.                     }
  104.                 } else {
  105.                     throw new IllegalStateException("Don't know how to handle message of type '"
  106.                             + message.getClass().getName() + "'. Are you missing a protocol encoder?");
  107.                 }

  108.                 if (localWrittenBytes == 0) {
  109.                     //一点都没写出去,写缓冲区已满,注册op_write下次继续
  110.                     //注册了op_write会唤醒select,process函数会将session添加到flushingSessions
  111.                     setInterestedInWrite(session, true);
  112.                     return false;
  113.                 }

  114.                 writtenBytes += localWrittenBytes;

  115.                 if (writtenBytes >= maxWrittenBytes) {
  116.                     //发送数据量太大,下次处理
  117.                     scheduleFlush(session);
  118.                     return false;
  119.                 }
  120.             } while (writtenBytes < maxWrittenBytes);
  121.         } catch (Exception e) {
  122.             if (req != null) {
  123.                 req.getFuture().setException(e);
  124.             }

  125.             IoFilterChain filterChain = session.getFilterChain();
  126.             filterChain.fireExceptionCaught(e);
  127.             return false;
  128.         }

  129.         return true;
  130.     }

  131.      private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
  132.      throws Exception {
  133.           IoBuffer buf = (IoBuffer) req.getMessage();
  134.          int localWrittenBytes = 0;
  135.          //实际写处理
  136.          if (buf.hasRemaining()) {
  137.              int length;

  138.              if (hasFragmentation) {
  139.                  length = Math.min(buf.remaining(), maxLength);
  140.              } else {
  141.                  length = buf.remaining();
  142.              }

  143.              try {
  144.                     localWrittenBytes = write(session, buf, length);
  145.              } catch (IOException ioe) {
  146.                  // We have had an issue while trying to send data to the
  147.                  // peer : let's close the session.
  148.                    session.close(true);
  149.              }

  150.          }
  151.          //统计信息
  152.           session.increaseWrittenBytes(localWrittenBytes, currentTime);
  153.          //写完
  154.          if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
  155.          // Buffer has been sent, clear the current request.
  156.          int pos = buf.position();
  157.          buf.reset();
  158.          //触发MessageSent事件
  159.           fireMessageSent(session, req);
  160.           // And set it back to its position
  161.           buf.position(pos);
  162.      }

  163.      return localWrittenBytes;
  164. }

三. MessageSend事件

   一次写操作,该事件会被触发两次。上面提到ProtocolCodecFilter在filterWrite会转发两个filterWrite:
1. WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
2. nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));

   因为这两个都写入WriteRequestQueue,所以在Processor线程中也触发了两次。但MessageWriteRequest实际是空IoBuffer,所以也不会有实际的写操作。
  MessageSend需要原始的写对象,所以encodedWriteRequest 的MessageSend事件在head中会被中断。实际触发该事件的是MessageWriteRequest。至于为什么不在encodedWriteRequest 携带原始对象不得而知。

点击(此处)折叠或打开

  1. private void fireMessageSent(S session, WriteRequest req) {
  2.         session.setCurrentWriteRequest(null);
  3.         IoFilterChain filterChain = session.getFilterChain();
  4.         filterChain.fireMessageSent(req);
  5.     }
  6. public void fireMessageSent(WriteRequest request) {
  7.         session.increaseWrittenMessages(request, System.currentTimeMillis());

  8.         try {
  9.             request.getFuture().setWritten();
  10.         } catch (Throwable t) {
  11.             fireExceptionCaught(t);
  12.         }

  13.         Entry head = this.head;
  14.         //就是这里,encodedWriteRequest被中断
  15.         if (!request.isEncoded()) {
  16.             callNextMessageSent(head, session, request);
  17.         }
  18.     }

  19. private void callNextMessageSent(Entry entry, IoSession session, WriteRequest writeRequest) {
  20.         try {
  21.             IoFilter filter = entry.getFilter();
  22.             NextFilter nextFilter = entry.getNextFilter();
  23.             filter.messageSent(nextFilter, session, writeRequest);
  24.         } catch (Throwable e) {
  25.             fireExceptionCaught(e);
  26.         }
  27.     }
  28. //head的messageSent只是转发到ProtocolCodecFilter
  29. //此为ProtocolCodecFilter的,转给tail
  30. public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
  31.         if (writeRequest instanceof EncodedWriteRequest) {
  32.             return;
  33.         }

  34.         if (writeRequest instanceof MessageWriteRequest) {
  35.             MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
  36.             nextFilter.messageSent(session, wrappedRequest.getParentRequest());
  37.         } else {
  38.             nextFilter.messageSent(session, writeRequest);
  39.         }
  40.     }
  41. //此为tail, 转给handle,结束
  42. public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
  43.             session.getHandler().messageSent(session, writeRequest.getMessage());
  44.         }






阅读(1811) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~