参考:
普通的 Sock 拥塞
在 ServerSocket的accept()方法;和InputStream的read()方式
因此我们需要两类型的线程分别进行处理。而且每一个阻塞方法所绑定的线程的生命周期和网络连接的生命周期是一致的 -- 并发造成大量维护线程,导致浪费
NIO 的解决办法 :
1. 数据缓冲处理(ByteBuffer) :解决 read/write 大量使用线程和每个线程中使用的缓存区内存问题
ByteBuffer 可使用 <共享缓冲区 - view buffer >
还可以使用 Direct ByteBuffer - native 本地非java内存 ,提高性能
为方便大家理解,下面用自己的语言描述下 :
position(操作游标) < limit(某操作挡板) < capacity(整个缓存大小) :
a。在写入场景中 (以写入数据第一行灰色);
b。执行filp(),就变成读取场景 ,游标指向写入开始,挡板放置在写入最后结束位置
a。读取到最后,执行clear() ,缓存区状态回到初始
2. 异步通道 (Channel) : [我的理解] Channel 是使用底层系统方法传送 Client Sock消息到 Server Buffer 中 ,非传统方式 使用 线程去传送消息.
Channel 维护ByteBuffer - Client Socket 的关系
3. 有条件的选择(Readiness Selection):
a. 有Channel 底层系统支持消息传输, 非拥塞主要的逻辑实现部分。
b. [我的理解]非拥塞的机制 :是由监听Channel的事件完成。消息交给底层系统接口维护(非java线程维护的)当消息传输结束或一段落,就触发Server上的一个响应事件,这样一个客户端消息传递请求就可以快速返回。
SelectionKey标识Selector和SelectableChannel,一旦一个Channel注册到Selector中,就会返回一个SelectionKey对象
SelectionKey保存了两类状态:对应的 Channel注册了哪些操作;对应的Channel的那些操作已经准备好了,可以进行相应的数据操作了)结合来实现这个功能的。
在参考中还提到这种非拥塞方式的一些弊端(当然 一些有特点的技术是有使用场景的):
1. 持久连接的超时问题(Timeout)
感觉这是个棘手问题,虽然交给底层维护消息的缓冲,但当一个消息的缓冲超时事件应该很有必要监听把 ?? 这个监听事件没有??
2. 如何使用Selector,由于每一个Selector的处理能力是有限的
3. 在非阻塞情况下,read和write都不在是阻塞的, 相对应的 IO不好的问题如果消息传输有丢失怎么办 ?
4. 如何共享内存Buffer,这给 程序猿 增加一定难度
5. 网络非拥塞造成的消息顺序问题
6. 内存泄露
NIO 两种并发线程 的介绍,目前还没具体使用 ,不给个人理解。
这上网上抄了个demo - 先mark下,到时加自己的注解
Client 端输出 $> welcome to VistaQQ
- if __name__ == '__main__':
- import socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('localhost', 6018))
- import time
- #time.sleep(2)
- sock.send('1 test !~ ')
- print sock.recv(1024)
- sock.close()
服务端输出(客户端运行了 3次 ) : $> 服务器启动
$> Client >> 1 test !~
$> key.isWritable
$> block = java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]
$> Client >> 1 test !~
$> key.isWritable
$> block = java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]
$>
$> Client >> 1 test !~
$> key.isWritable
$> block = java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]
- //package com.vista.Server;
- import java.io.BufferedWriter;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.OutputStreamWriter;
- import java.io.PrintWriter;
- import java.net.InetSocketAddress;
- import java.net.ServerSocket;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- import java.nio.channels.FileChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.nio.charset.Charset;
- import java.nio.charset.CharsetDecoder;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.Set;
- public class SelectorServer {
- private static int DEFAULT_SERVERPORT = 6018;//默认端口
- private static int DEFAULT_BUFFERSIZE = 1024;//默认缓冲区大小为1024字节
- private static String DEFAULT_CHARSET = "GB2312";//默认码集
- private static String DEFAULT_FILENAME = "bigfile.dat";
- private ServerSocketChannel sschannel;
- private Selector selector;//选择器
- private ByteBuffer buffer;//字节缓冲区
- private int port;
- private Charset charset;//字符集
- private CharsetDecoder decoder;//解码器
-
-
- public SelectorServer(int port) throws IOException {
- this.port = port;
- this.sschannel = null;
- this.selector = Selector.open();//打开选择器
- this.buffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE);
- this.charset = Charset.forName(DEFAULT_CHARSET);
- this.decoder = this.charset.newDecoder();
- }
-
- private class HandleClient {
- private String strGreeting = "welcome to VistaQQ";
- public HandleClient() throws IOException {
- }
- public String readBlock() {
- //读块数据
- return this.strGreeting;
- }
- public void close() {
- }
- }
- protected void handleKey(SelectionKey key) throws IOException {
- //处理事件
- if (key.isAcceptable()) {
- // 接收请求
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- //取出对应的服务器通道
- SocketChannel schannel = server.accept();
- schannel.configureBlocking(false);
- //客户socket通道注册读操作
- schannel.register(selector, SelectionKey.OP_READ);
- } else if (key.isReadable()) {
- // 读信息
- SocketChannel schannel = (SocketChannel) key.channel();
- int count = schannel.read(this.buffer);
- if (count > 0) {
- this.buffer.flip();
- CharBuffer charBuffer = decoder.decode(this.buffer);
- System.out.println("Client >> " + charBuffer.toString());
- // 注册写入 和添加 处理者
- schannel.register(selector,
- SelectionKey.OP_WRITE|SelectionKey.OP_READ,
- new HandleClient());
- } else {
- //客户已经断开
- schannel.close();
- }
- this.buffer.clear();//清空缓冲区
- } else if (key.isWritable()) {
- System.out.println("key.isWritable");
- // 写事件
- SocketChannel schannel = (SocketChannel) key.channel();
- HandleClient handle = (HandleClient) key.attachment();//取出处理者
- // 包装一个 缓冲区
- ByteBuffer block = ByteBuffer.wrap(handle.readBlock().getBytes());
- System.out.println(" block = "+block);
- schannel.write(block);
-
- // 是表示Socket可写,网络不出现阻塞情况下,一直是可以写的,
- // 所认一直为true. 一般不注册OP_WRITE事件,或特别小心注册写入事件.
- // 为测试 直接 取出 写入 事件
- key.interestOps(SelectionKey.OP_READ);
- //channel.close();
- }
- }
- public void listen() throws IOException {
- //服务器开始监听端口,提供服务
- ServerSocket socket;
- sschannel = ServerSocketChannel.open(); // 打开通道
- socket = sschannel.socket(); //得到与通到相关的socket对象
- socket.bind(new InetSocketAddress(port)); //将scoket榜定在制定的端口上
- //配置通到使用非阻塞模式,在非阻塞模式下,可以编写多道程序同时避免使用复杂的多线程
- sschannel.configureBlocking(false);
- sschannel.register(selector, SelectionKey.OP_ACCEPT);
- try {
- while(true) {
- // 与通常的程序不同,这里使用channel.accpet()接受客户端连接请求,而不是在socket对象上调用accept(),这里在调用accept()方法时如果通道配置为非阻塞模式,那么accept()方法立即返回null,并不阻塞
- this.selector.select();
- //if (selector.select(1000) == 0) {
- // System.out.print(".");
- // continue;
- //}
- Iterator iter = this.selector.selectedKeys().iterator();
- while(iter.hasNext()) {
- SelectionKey key = (SelectionKey)iter.next();
- iter.remove();
- this.handleKey(key);
- }
- }
- }
- catch(IOException ex) {
- ex.printStackTrace();
- }
- }
- public static void main(String[] args) throws IOException {
- System.out.println("服务器启动");
- SelectorServer server = new SelectorServer(SelectorServer.DEFAULT_SERVERPORT);
- server.listen(); //服务器开始监听端口,提供服务
- }
- }
阅读(1605) | 评论(0) | 转发(0) |