Chinaunix首页 | 论坛 | 博客
  • 博客访问: 917304
  • 博文数量: 91
  • 博客积分: 803
  • 博客等级: 准尉
  • 技术积分: 1051
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-24 13:42
文章分类

全部博文(91)

文章存档

2021年(1)

2020年(4)

2019年(4)

2018年(9)

2017年(11)

2016年(11)

2015年(6)

2014年(3)

2013年(28)

2012年(14)

分类: Java

2018-05-07 22:51:37

使用同步非阻塞模型NIO.
主线程负责监听端口、accept连接、read操作.
启动另外一个线程负责消息的write操作.

采用包长+包体组装协议.
LinkedBlockingQueue 存储待广播的消息内容.


服务端代码:

点击(此处)折叠或打开

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.SelectionKey;
  5. import java.nio.channels.Selector;
  6. import java.nio.channels.ServerSocketChannel;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.Iterator;
  9. import java.util.LinkedList;
  10. import java.util.List;
  11. import java.util.Set;
  12. import java.util.concurrent.BlockingQueue;
  13. import java.util.concurrent.LinkedBlockingQueue;


  14. public class ServerSocketDemo2 {
  15.     
  16.     private static BlockingQueue<String> bq = new LinkedBlockingQueue<>();
  17.     
  18.     private static List<SocketChannel> listSc = new LinkedList<SocketChannel>();

  19.     public static void main(String[] args) throws Exception {
  20.         
  21.         //1.该线程负责 写socket
  22.         //2.主线程维连接和读socket
  23.         new Thread(new Runnable() {
  24.             @Override
  25.             public void run() {
  26.                 System.out.println("该线程负责写入socket");
  27.                 //该线程负责写操作
  28.                 String msg = null;
  29.                 while (true ) {
  30.                     try {
  31.                         //此处会阻塞
  32.                         msg = bq.take();
  33.                     } catch (InterruptedException e1) {
  34.                         e1.printStackTrace();
  35.                     }
  36.                     System.out.println(System.currentTimeMillis());
  37.                     System.out.println("接收到数据"+msg);
  38.                 
  39.                     Iterator<SocketChannel> it = listSc.iterator();
  40.                     while(it.hasNext()) {
  41.                      SocketChannel sc = it.next();
  42.                         if(!sc.isConnected()) {
  43.                             continue;
  44.                         }
  45.                         
  46.                         //先写入包长度
  47.                         ByteBuffer headwb = ByteBuffer.allocate(4);
  48.                         headwb.putInt(msg.getBytes().length);
  49.                         //重绕缓冲区 否则下面无法进行write操作
  50.                         headwb.flip();
  51.                         while(headwb.hasRemaining()) {
  52.                             try {
  53.                                 sc.write(headwb);
  54.                             } catch (IOException e) {
  55.                                 e.printStackTrace();
  56.                             }
  57.                         }
  58.                         //再写入包内容
  59.                         ByteBuffer bodywb = ByteBuffer.wrap(msg.getBytes());
  60.                         while(bodywb.hasRemaining()) {
  61.                             try {
  62.                                 sc.write(bodywb);
  63.                                 System.out.println("发送完成");
  64.                             } catch (IOException e) {
  65.                                 e.printStackTrace();
  66.                             }
  67.                         }
  68.                     }
  69.                 }
  70.             }
  71.         }).start();
  72.         //1. 创建一个选择器
  73.         Selector s = Selector.open();
  74.         //2.创建一个服务端channel
  75.         ServerSocketChannel ssc = ServerSocketChannel.open();
  76.         //3.置为非阻塞IO
  77.         ssc.configureBlocking(false);
  78.         ssc.bind(new InetSocketAddress("127.0.0.1", 44446));
  79.         
  80.         //4.将当前channel 的accept read write操作注册到 选择器中
  81.         ssc.register(s , SelectionKey.OP_ACCEPT);
  82.         //5循环接收事件
  83.         while(true) {
  84.             s.select();//该方法为阻塞方法,如果没有事件唤醒就不会执行下边的代码
  85.             //遍历已经唤醒的事件列表
  86.             Set<SelectionKey> list = s.selectedKeys();
  87.             Iterator<SelectionKey> it = list.iterator();
  88.             while (it.hasNext()) {
  89.                 SelectionKey sk = it.next();
  90.                 if( sk.isAcceptable() == true ) {
  91.                     //获取channel 强制转换
  92.                     ServerSocketChannel wsc = (ServerSocketChannel)sk.channel();
  93.                     //设置为非阻塞模式 所以accepc操作不一定有返回值 需循环操作只到有返回值为止
  94.                     SocketChannel sc = null;
  95.                     while ( sc == null ) {
  96.                         sc = wsc.accept();
  97.                     }
  98.                     //将该连接socket置为非阻塞状态
  99.                     sc.configureBlocking(false);
  100.                     //当前socket同时注册 读和写监控
  101.                     sc.register(s,SelectionKey.OP_READ );
  102.                     

  103.                     listSc.add(sc);
  104.                     
  105.                 } else if(sk.isReadable() == true) {
  106.                     SocketChannel sc = (SocketChannel)sk.channel();
  107.                     //从socket读取数据写入缓冲区 先读整型的包长
  108.                     ByteBuffer ib = ByteBuffer.allocate(4);
  109.                     while(ib.hasRemaining()) {
  110.                         sc.read(ib);
  111.                     }
  112.                     //重绕缓冲区 否则 下面的getInt不能获取准确的长度
  113.                     ib.flip();
  114.                     int len = ib.getInt();
  115.                     ByteBuffer contentBuffer = ByteBuffer.allocate(len);
  116.                     while(contentBuffer.hasRemaining()) {
  117.                         sc.read(contentBuffer);
  118.                     }
  119.                     
  120.                     //写入队列
  121.                     bq.put(new String(contentBuffer.array()));
  122.                     
  123.                 } else {
  124.                     throw new Exception("未知操作类型!");
  125.                 }
  126.                 it.remove();
  127.             }
  128.         }
  129.     }

  130. }

客户端代码:

点击(此处)折叠或打开

  1. import java.net.InetSocketAddress;
  2. import java.nio.ByteBuffer;
  3. import java.nio.channels.SelectionKey;
  4. import java.nio.channels.Selector;
  5. import java.nio.channels.SocketChannel;
  6. import java.util.Iterator;
  7. import java.util.Set;

  8. public class ClientSocketDemo {

  9.     public static void main(String[] args) throws Exception {
  10.         //1. 创建一个选择器
  11.         Selector s = Selector.open();
  12.         //2 创建channel
  13.         SocketChannel sc = SocketChannel.open();
  14.         sc.configureBlocking(false);
  15.         sc.connect(new InetSocketAddress("127.0.0.1",44446));
  16.         sc.register(s, SelectionKey.OP_CONNECT);
  17.         while( true ) {
  18.             s.select();
  19.             //遍历已经唤醒的事件列表
  20.             Set<SelectionKey> list = s.selectedKeys();
  21.             Iterator<SelectionKey> it = list.iterator();
  22.             while (it.hasNext()) {
  23.                 SelectionKey sk = it.next();
  24.                 if(sk.isReadable() == true) {
  25.                     //获取channel
  26.                     SocketChannel readsc = (SocketChannel)sk.channel();
  27.                     //从socket读取数据写入缓冲区 先读整型的包长
  28.                     ByteBuffer ib = ByteBuffer.allocate(4);
  29.                     while(ib.hasRemaining()) {
  30.                         readsc.read(ib);
  31.                     }
  32.                     //重绕缓冲区 否则 下面的getInt不能获取准确的长度
  33.                     ib.flip();
  34.                     int len = ib.getInt();
  35.                     ByteBuffer contentBuffer = ByteBuffer.allocate(len);
  36.                     while(contentBuffer.hasRemaining()) {
  37.                         readsc.read(contentBuffer);
  38.                     }
  39.                     System.out.println(new String(contentBuffer.array()));
  40.                 } else if(sk.isWritable()) {
  41.                     //获取channel
  42.                     SocketChannel writesc = (SocketChannel)sk.channel();
  43.                     
  44.                     //返回字符串
  45.                     String rstr = "hellow world";
  46.                     //先写入包长度
  47.                     ByteBuffer headwb = ByteBuffer.allocate(4);
  48.                     headwb.putInt(rstr.getBytes().length);
  49.                     //重绕缓冲区 否则下面无法进行write操作
  50.                     headwb.flip();
  51.                     while(headwb.hasRemaining()) {
  52.                         writesc.write(headwb);
  53.                     }
  54.                     //再写入包内容
  55.                     ByteBuffer bodywb = ByteBuffer.wrap(rstr.getBytes());
  56.                     while(bodywb.hasRemaining()) {
  57.                         writesc.write(bodywb);
  58.                     }
  59.                     //注册当前通道上的读事件 并取消掉写事件
  60.                     writesc.register(s,sk.interestOps() & ~ SelectionKey.OP_WRITE );
  61.                 } else if(sk.isConnectable()) {
  62.                     //获取channel
  63.                     SocketChannel sc1 = (SocketChannel)sk.channel();
  64.                     while(!sc1.isConnected()) {
  65.                         sc1.finishConnect();
  66.                     }
  67.                     sc1.register(s, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
  68.                 } else {
  69.                     throw new Exception("未知操作类型!");
  70.                 }
  71.                 it.remove();
  72.             }
  73.         }
  74.     }

  75. }


阅读(1272) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~