最近学习了非阻塞IO(NIO),因为厌烦了在开发并行处理时候,阻塞IO所导致的肥服务端,因为对于每个客户连接都要产生一个线程对此进行处理,当然你
可以不这样实现,但我的前提是开发并行处理,下面是我的源码,因为是在dos命令行测试的,所以要是编写为GUI的时候,还要很多要改的东西,这也是我下
个征服的对象,当然我已经迫不及待了,下面是我花了三个晚上学习并编写的非阻塞聊天室:(供交流学习用)
客户端:
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.io.BufferedReader;
import java.io.*;
import java.lang.Thread;
import java.nio.charset.*;
import java.nio.charset.CharsetDecoder;
public class ChatClient{
private InetSocketAddress address = new InetSocketAddress("localhost",13);
private SocketChannel client = null;
private String user = null;
private String pass = null;
private BufferedReader in = null;
private Thread t = null;
public ChatClient(){
try{
client = SocketChannel.open();
System.out.println("connecting...");
client.connect(address);
System.out.println("connected with "+address.getHostName());
client.configureBlocking(false);
}catch(IOException ex){
ex.printStackTrace();
System.exit(-1);
}
this.start();
}
public void start(){
this.receiveMessage();
this.sendMessage();
}
public void sendMessage(){
try{
in = new BufferedReader(new InputStreamReader(System.in));
System.out.println("Input the Info then check it out on the server");
System.out.print("Your Name:");
user = in.readLine();
System.out.println("Password:");
pass = in.readLine();
ByteBuffer buffer = ByteBuffer.allocate(50);
String message= new String("LOGIN:"+user+"&"+pass);
buffer = ByteBuffer.wrap(message.getBytes());
while(buffer.hasRemaining()&client.write(buffer)!=-1);
System.out.println(message+" has been send");
buffer.flip();
Charset charset = Charset.forName("gb2312");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buffer);
//System.out.println("receive:"+charBuffer+" length:"+charBuffer.limit());
}catch(IOException ex){
ex.printStackTrace();
}
this.waitFor(2000);
System.out.println("WELCOME TO THE KING 'S CHAT ROOM!");
System.out.println("Input the Info(exit is to leave out)");
while(true){
System.out.print(">");
ByteBuffer buffer = ByteBuffer.allocate(100);
in = new BufferedReader(new InputStreamReader(System.in));
try{
String read=in.readLine();
if(read.equals("exit")){
break;
}
String message1="SENTO:"+read;
buffer = ByteBuffer.wrap(message1.getBytes());
// buffer.flip();
System.out.println("before");
while(buffer.hasRemaining()&client.write(buffer)!=-1);
// buffer.flip();
System.out.println(message1+" has been send");
this.waitFor(500);
}catch(IOException ex){
ex.printStackTrace();
}
}
System.out.println("Welcome to use this soft!---King");
System.exit(-1);
}
public void waitFor(long time){
try{
Thread.sleep(time);
}catch(Exception ex){
ex.printStackTrace();
}
}
public void receiveMessage(){
t=new ReceiveThread(client);
t.start();
}
public static void main(String[]args){
ChatClient cc=new ChatClient();
}
class ReceiveThread extends Thread{
SocketChannel client =null;
ByteBuffer buffer=ByteBuffer.allocate(50);
private boolean val=true;
public ReceiveThread(SocketChannel client){
this.client = client;
}
public void run(){
while(val){
try{
while (client.read(buffer) > 0){
buffer.flip();
String result = decode(buffer);
System.out.println(">(back)"+result);
buffer.flip();
}
}catch(IOException ex){
ex.printStackTrace();
return;
}
}
}
}
public String decode(ByteBuffer buffer){
Charset charset=null;
CharsetDecoder decoder=null;
CharBuffer charBuffer=null;
try{
charset= Charset.forName("gb2312");
decoder= charset.newDecoder();
charBuffer= decoder.decode(buffer);
return charBuffer.toString();
}catch(Exception ex){
ex.printStackTrace();
return "";
}
}
}注意:可以多个客户进行交流,程序要求输入验证信息,但由于时间原因后台我都以合法用户给予回馈
服务端:import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.net.ServerSocket;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.util.Iterator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.nio.charset.*;
import java.nio.*;
public class ChatServer {
private int port = 13;
private Selector selector;
private ServerSocketChannel ssc;
private ServerSocket server;
private InetSocketAddress address;
private ArrayList connectKey=new ArrayList();
public ChatServer(){
//initServer
try{
ssc=ServerSocketChannel.open();
server=ssc.socket();
address = new InetSocketAddress(port);
server.bind(address);
selector=Selector.open();
ssc.configureBlocking(false);
ssc.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("============================================================");
System.out.println("= =");
System.out.println("= =");
System.out.println("= 水
底沙聊天室-version1.0 =");
System.out.println("= =");
System.out.println("= QQ:247095340(交
流) =");
System.out.println("============================================================");
System.out.println("Listening the port 13...");
}catch(IOException ex){
ex.printStackTrace();
System.exit(-1);
}
}
public void startServer() throws IOException{
while(true){
int i=selector.select();
//System.out.print(i);
Iterator keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = (SelectionKey)keys.next();
keys.remove();
try{
if(key.isAcceptable()){
ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
SocketChannel channel = ssc.accept();//return null if there's no request
System.out.println(channel+" has accepted");
channel.configureBlocking(false);
SelectionKey clientKey=channel.register(selector,SelectionKey.OP_READ);
}//else
if(key.isWritable()){
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment();
if(buffer!=null){
key.attach(null);//avoid the return twice
//check info:the login or the message
//buffer.flip();
String checkBuffer = this.decode(buffer);
System.out.println("write:"+checkBuffer);
if(checkBuffer.equals("LOGIN:OK")){
//return LOGIN:OK then add into the connectKey array!
System.out.println("ok"+buffer);
buffer.flip();
//while(buffer.hasRemaining()&channel.write(buffer)!=-1);
channel.write(buffer);
key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
connectKey.add(key);//add to the connectKey array
System.out.println("here");
}else if(checkBuffer.equals("LOGIN:ERROR")){
//return LOGIN:ERROR the client should close the channel
//warning:method:key.channel();
//Returns the channel for which this key was created.
// This method will continue to return the channel even after the key is cancelled.
while(buffer.hasRemaining()&channel.write(buffer)!=-1);
key.cancel();
}else //if(checkBuffer.indexOf("SENTO:")!=-1){
{
//return the message to everyone
// while(buffer.hasRemaining()&channel.write(buffer)!=-1);
System.out.println("sento"+buffer);
buffer.flip();
channel.write(buffer);
System.out.println("send over");
}
}
}//else
if(key.isReadable()){
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer=ByteBuffer.allocate(50);
System.out.println("read...");
channel.read(buffer);
buffer.flip();
String checkBuffer = this.decode(buffer);
System.out.println("read:"+checkBuffer);
//while(buffer.hasRemaining()&&channel.read(buffer)!=-1);
//check the buffer
//buffer.flip();
//String checkBuffer = this.decode(buffer);
// System.out.println("read:"+checkBuffer);
if(checkBuffer.startsWith("LOGIN:")){
//get info of the user & pass then check for it,return feedback!
//the format is LOGIN:user&pass
int p1=checkBuffer.length();
int p2=checkBuffer.indexOf("&");
String user=checkBuffer.substring(6,p2);
String pass=checkBuffer.substring(p2+1,p1);
System.out.println(user+pass);
//todo check from the database!!!
//assume the user is legal
ByteBuffer feedback = ByteBuffer.allocate(20);
feedback=ByteBuffer.wrap("LOGIN:OK".getBytes());
key.interestOps(SelectionKey.OP_WRITE);
key.attach(feedback);
}else if(checkBuffer.startsWith("SENTO:")){
String message = checkBuffer.substring(6);
System.out.println("sentto:"+message);
ByteBuffer buffer1 = ByteBuffer.allocate(50);
buffer1=ByteBuffer.wrap(message.getBytes());
Iterator it = connectKey.iterator();
//key.interestOps(SelectionKey.OP_WRITE);
while(it.hasNext()){
((SelectionKey)it.next()).attach(buffer1.duplicate());
}
System.out.println("here1");
//for(int i=0;i //connectKey[i].attach(buffer.duplicate());
//}
}
}
}catch(IOException ex){
key.cancel();
//System.exit(-1);
try{
key.channel().close();
}catch(IOException cex){
}
}
}
}
}
public String decode(ByteBuffer buffer){
Charset charset=null;
CharsetDecoder decoder=null;
CharBuffer charBuffer=null;
try{
charset= Charset.forName("gb2312");
decoder= charset.newDecoder();
charBuffer= decoder.decode(buffer);
return charBuffer.toString();
}catch(Exception ex){
ex.printStackTrace();
return "";
}
}
public static void main(String []args){
ChatServer cs = new ChatServer();
try{
cs.startServer();
}catch(IOException ex){
ex.printStackTrace();
System.exit(-1);
}
}
}注意:假如客户强制登出服务端时候,服务器里面的登录用户列表还是保存他注册的SelectionKey地址,这是存在问题,其实解决很简单,在对通道进行写入时候,如果通道已经被关闭的话,可以用try/catch语句进行处理
上面就是总的程序,其实之前我都是用阻塞socket去完成这类工作的,由于在用swt基于实现时候遇到很多swt线程问题,后期我会以GUI界面共享给大家,当然自己也在不断学习中!King
阅读(651) | 评论(0) | 转发(0) |