Chinaunix首页 | 论坛 | 博客
  • 博客访问: 374282
  • 博文数量: 152
  • 博客积分: 6020
  • 博客等级: 准将
  • 技术积分: 850
  • 用 户 组: 普通用户
  • 注册时间: 2006-03-11 19:20
文章分类

全部博文(152)

文章存档

2017年(1)

2010年(1)

2007年(3)

2006年(147)

我的朋友

分类: Java

2006-03-26 14:45:10

- 文档中心 - Java 阅读:4   评论: 0    参与评论
标题   自己编写的NIO非阻塞聊天室     选择自 shuidisha 的 Blog
关键字   自己编写的NIO非阻塞聊天室
出处  

        最近学习了非阻塞IO(NIO),因为厌烦了在开发并行处理时候,阻塞IO所导致的肥服务端,因为对于每个客户连接都要产生一个线程对此进行处理,当然你可以不这样实现,但我的前提是开发并行处理,下面是我的源码,因为是在dos命令行测试的,所以要是编写为GUI的时候,还要很多要改的东西,这也是我下个征服的对象,当然我已经迫不及待了,下面是我花了三个晚上学习并编写的非阻塞聊天室:(供交流学习用)

客户端
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.io.BufferedReader;
import java.io.*;
import java.lang.Thread;
import java.nio.charset.*;
import java.nio.charset.CharsetDecoder;

public class ChatClient{
 
 private InetSocketAddress address = new InetSocketAddress("localhost",13);
 private SocketChannel client = null;
 private String user = null;
 private String pass = null;
 private BufferedReader in = null;
 private Thread t = null;
 
 public ChatClient(){
  try{
   client = SocketChannel.open();
   System.out.println("connecting...");
   
   client.connect(address);
   System.out.println("connected with "+address.getHostName());
   client.configureBlocking(false);
  }catch(IOException ex){
   ex.printStackTrace();
   System.exit(-1);
  }
  this.start();
 }
 
 public void start(){
  this.receiveMessage();
  this.sendMessage();  
 }
 
 public void sendMessage(){
  try{
   in = new BufferedReader(new InputStreamReader(System.in));
   System.out.println("Input the Info then check it out on the server");
   System.out.print("Your Name:");
   user = in.readLine();
   System.out.println("Password:");
   pass = in.readLine();
   
   ByteBuffer buffer = ByteBuffer.allocate(50);
   String message= new String("LOGIN:"+user+"&"+pass);
   buffer = ByteBuffer.wrap(message.getBytes());
   while(buffer.hasRemaining()&client.write(buffer)!=-1);
   System.out.println(message+" has been send");
   
   
   buffer.flip();
   Charset charset = Charset.forName("gb2312");
   CharsetDecoder decoder = charset.newDecoder();
   CharBuffer charBuffer = decoder.decode(buffer);
   //System.out.println("receive:"+charBuffer+" length:"+charBuffer.limit());
  }catch(IOException ex){
   ex.printStackTrace();
  } 
  
  this.waitFor(2000);
  
  System.out.println("WELCOME TO THE KING 'S CHAT ROOM!");
  System.out.println("Input the Info(exit is to leave out)");
  while(true){
   System.out.print(">");
  
   ByteBuffer buffer = ByteBuffer.allocate(100);
   in = new BufferedReader(new InputStreamReader(System.in));
   try{
    String read=in.readLine();
    if(read.equals("exit")){
     break;
    }   
    String message1="SENTO:"+read;
    buffer = ByteBuffer.wrap(message1.getBytes());
   // buffer.flip();
    System.out.println("before");
    while(buffer.hasRemaining()&client.write(buffer)!=-1);
   // buffer.flip();
    System.out.println(message1+" has been send");
    this.waitFor(500);
   }catch(IOException ex){
    ex.printStackTrace();
   }
  }
  
  System.out.println("Welcome to use this soft!---King");
  System.exit(-1);
  
 }
 
 public void waitFor(long time){
  try{
   Thread.sleep(time);     
  }catch(Exception ex){
   ex.printStackTrace();
  }
 }
 
 public void receiveMessage(){
  t=new ReceiveThread(client);
  t.start();
 }
 
 public static void main(String[]args){
  ChatClient cc=new ChatClient(); 
  
 }
 
 class ReceiveThread extends Thread{
  SocketChannel client =null;
  ByteBuffer buffer=ByteBuffer.allocate(50);
  private boolean val=true;
  
  public ReceiveThread(SocketChannel client){
   this.client = client;
  }
  
  public void run(){
   while(val){
    try{
     while (client.read(buffer) > 0){
      buffer.flip();
      String result = decode(buffer);
      System.out.println(">(back)"+result);
      buffer.flip();  
     }
    }catch(IOException ex){
     ex.printStackTrace();
     return;
    }
   
   }
  }
 }
 
 public String decode(ByteBuffer buffer){
  Charset charset=null;
  CharsetDecoder decoder=null;
  CharBuffer charBuffer=null;
  try{
   charset= Charset.forName("gb2312");
    decoder= charset.newDecoder();
    charBuffer= decoder.decode(buffer);
   return charBuffer.toString();
  }catch(Exception ex){
   ex.printStackTrace();
   return "";
  }
  
 }
}注意:可以多个客户进行交流,程序要求输入验证信息,但由于时间原因后台我都以合法用户给予回馈

服务端:import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.net.ServerSocket;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.util.Iterator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.nio.charset.*;
import java.nio.*;

public class ChatServer {
 
 private int port = 13;
 private Selector selector;
 private ServerSocketChannel ssc;
 private ServerSocket server;
 private InetSocketAddress address;
 private ArrayList connectKey=new ArrayList();
 
 public ChatServer(){
  //initServer  
  try{
   ssc=ServerSocketChannel.open();
   server=ssc.socket();
   address = new InetSocketAddress(port);
   server.bind(address);
   selector=Selector.open();
   ssc.configureBlocking(false);
   ssc.register(selector,SelectionKey.OP_ACCEPT);
   System.out.println("============================================================");
   System.out.println("=                                                                                                                                        =");
   System.out.println("=                                                                                                                                        =");
   System.out.println("=                                     水底沙聊天室-version1.0                                                    =");
   System.out.println("=                                                                                                                                        =");
   System.out.println("=                                                                                                QQ:247095340(交流) =");
   System.out.println("============================================================");   
   System.out.println("Listening the port 13...");  
  }catch(IOException ex){
   ex.printStackTrace();
   System.exit(-1);
  }
 }
 
 public void startServer() throws IOException{
  while(true){
   int i=selector.select();
   //System.out.print(i);
   Iterator keys = selector.selectedKeys().iterator();
   
   while(keys.hasNext()){
    SelectionKey key = (SelectionKey)keys.next();
    keys.remove();
    try{    
     if(key.isAcceptable()){
      ServerSocketChannel ssc=(ServerSocketChannel)key.channel();     
      SocketChannel channel = ssc.accept();//return null if there's no request
      System.out.println(channel+" has accepted");
      channel.configureBlocking(false);
      SelectionKey clientKey=channel.register(selector,SelectionKey.OP_READ);     
     }//else
     if(key.isWritable()){
      SocketChannel channel = (SocketChannel)key.channel();
      ByteBuffer buffer = (ByteBuffer)key.attachment();
      
      if(buffer!=null){   
       key.attach(null);//avoid the return twice
       
       //check info:the login or the message
       //buffer.flip();

       String checkBuffer = this.decode(buffer);
       System.out.println("write:"+checkBuffer);
       
       if(checkBuffer.equals("LOGIN:OK")){
        //return LOGIN:OK then add into the connectKey array!
        System.out.println("ok"+buffer);
        buffer.flip();
        //while(buffer.hasRemaining()&channel.write(buffer)!=-1);
        channel.write(buffer);
        key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);      
        connectKey.add(key);//add to the connectKey array
        System.out.println("here");
       }else if(checkBuffer.equals("LOGIN:ERROR")){
        //return LOGIN:ERROR the client should close the channel
        //warning:method:key.channel();
        //Returns the channel for which this key was created.
        // This method will continue to return the channel even after the key is cancelled.
        while(buffer.hasRemaining()&channel.write(buffer)!=-1);
        key.cancel();
       }else //if(checkBuffer.indexOf("SENTO:")!=-1){
       {
        
        //return the message to everyone
       // while(buffer.hasRemaining()&channel.write(buffer)!=-1);

        
        System.out.println("sento"+buffer);
        buffer.flip();
        channel.write(buffer);
        System.out.println("send over"); 
       }
      }
     }//else
     if(key.isReadable()){
      SocketChannel channel = (SocketChannel)key.channel();
      ByteBuffer buffer=ByteBuffer.allocate(50);
      System.out.println("read...");
      
      channel.read(buffer);
 
      buffer.flip();
      String checkBuffer = this.decode(buffer);
      System.out.println("read:"+checkBuffer);
     
      //while(buffer.hasRemaining()&&channel.read(buffer)!=-1);
      
      //check the buffer
      //buffer.flip();
      
      //String checkBuffer = this.decode(buffer);
     // System.out.println("read:"+checkBuffer);

      
     
      if(checkBuffer.startsWith("LOGIN:")){
       //get info of the user & pass then check for it,return feedback!
       //the format is LOGIN:user&pass
       int p1=checkBuffer.length();
       int p2=checkBuffer.indexOf("&");
       
       String user=checkBuffer.substring(6,p2);
       String pass=checkBuffer.substring(p2+1,p1);
       System.out.println(user+pass);
       
       //todo check from the database!!!
       //assume the user is legal
       ByteBuffer feedback = ByteBuffer.allocate(20);
       feedback=ByteBuffer.wrap("LOGIN:OK".getBytes());
       key.interestOps(SelectionKey.OP_WRITE);
       key.attach(feedback);      
      }else if(checkBuffer.startsWith("SENTO:")){
       String message = checkBuffer.substring(6);
       System.out.println("sentto:"+message);
       ByteBuffer buffer1 = ByteBuffer.allocate(50);
       buffer1=ByteBuffer.wrap(message.getBytes());
       Iterator it = connectKey.iterator();
       //key.interestOps(SelectionKey.OP_WRITE);
       while(it.hasNext()){
        ((SelectionKey)it.next()).attach(buffer1.duplicate());
       }
       System.out.println("here1");
       //for(int i=0;i        //connectKey[i].attach(buffer.duplicate());
       //}
      }
     }
    }catch(IOException ex){
     key.cancel();
     //System.exit(-1);
     try{
      key.channel().close();
     }catch(IOException cex){
     }
    }
   }
  }
 }
 
 public String decode(ByteBuffer buffer){
  Charset charset=null;
  CharsetDecoder decoder=null;
  CharBuffer charBuffer=null;
  try{
   charset= Charset.forName("gb2312");
    decoder= charset.newDecoder();
    charBuffer= decoder.decode(buffer);
   return charBuffer.toString();
  }catch(Exception ex){
   ex.printStackTrace();
   return "";
  }
  
 }
 
 public static void main(String []args){
  ChatServer cs = new ChatServer();
  try{
   cs.startServer();
  }catch(IOException ex){
   ex.printStackTrace();
   System.exit(-1);
  }
 }
}注意:假如客户强制登出服务端时候,服务器里面的登录用户列表还是保存他注册的SelectionKey地址,这是存在问题,其实解决很简单,在对通道进行写入时候,如果通道已经被关闭的话,可以用try/catch语句进行处理

上面就是总的程序,其实之前我都是用阻塞socket去完成这类工作的,由于在用swt基于实现时候遇到很多swt线程问题,后期我会以GUI界面共享给大家,当然自己也在不断学习中!King


相关文章
阅读(588) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~