使用同步非阻塞模型NIO.
主线程负责监听端口、accept连接、read操作.
启动另外一个线程负责消息的write操作.
采用包长+包体组装协议.
LinkedBlockingQueue
存储待广播的消息内容.
服务端代码:
客户端代码:
-
import java.net.InetSocketAddress;
-
import java.nio.ByteBuffer;
-
import java.nio.channels.SelectionKey;
-
import java.nio.channels.Selector;
-
import java.nio.channels.SocketChannel;
-
import java.util.Iterator;
-
import java.util.Set;
-
-
public class ClientSocketDemo {
-
-
public static void main(String[] args) throws Exception {
-
//1. 创建一个选择器
-
Selector s = Selector.open();
-
//2 创建channel
-
SocketChannel sc = SocketChannel.open();
-
sc.configureBlocking(false);
-
sc.connect(new InetSocketAddress("127.0.0.1",44446));
-
sc.register(s, SelectionKey.OP_CONNECT);
-
while( true ) {
-
s.select();
-
//遍历已经唤醒的事件列表
-
Set<SelectionKey> list = s.selectedKeys();
-
Iterator<SelectionKey> it = list.iterator();
-
while (it.hasNext()) {
-
SelectionKey sk = it.next();
-
if(sk.isReadable() == true) {
-
//获取channel
-
SocketChannel readsc = (SocketChannel)sk.channel();
-
//从socket读取数据写入缓冲区 先读整型的包长
-
ByteBuffer ib = ByteBuffer.allocate(4);
-
while(ib.hasRemaining()) {
-
readsc.read(ib);
-
}
-
//重绕缓冲区 否则 下面的getInt不能获取准确的长度
-
ib.flip();
-
int len = ib.getInt();
-
ByteBuffer contentBuffer = ByteBuffer.allocate(len);
-
while(contentBuffer.hasRemaining()) {
-
readsc.read(contentBuffer);
-
}
-
System.out.println(new String(contentBuffer.array()));
-
} else if(sk.isWritable()) {
-
//获取channel
-
SocketChannel writesc = (SocketChannel)sk.channel();
-
-
//返回字符串
-
String rstr = "hellow world";
-
//先写入包长度
-
ByteBuffer headwb = ByteBuffer.allocate(4);
-
headwb.putInt(rstr.getBytes().length);
-
//重绕缓冲区 否则下面无法进行write操作
-
headwb.flip();
-
while(headwb.hasRemaining()) {
-
writesc.write(headwb);
-
}
-
//再写入包内容
-
ByteBuffer bodywb = ByteBuffer.wrap(rstr.getBytes());
-
while(bodywb.hasRemaining()) {
-
writesc.write(bodywb);
-
}
-
//注册当前通道上的读事件 并取消掉写事件
-
writesc.register(s,sk.interestOps() & ~ SelectionKey.OP_WRITE );
-
} else if(sk.isConnectable()) {
-
//获取channel
-
SocketChannel sc1 = (SocketChannel)sk.channel();
-
while(!sc1.isConnected()) {
-
sc1.finishConnect();
-
}
-
sc1.register(s, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
-
} else {
-
throw new Exception("未知操作类型!");
-
}
-
it.remove();
-
}
-
}
-
}
-
-
}
阅读(1243) | 评论(0) | 转发(0) |