Chinaunix首页 | 论坛 | 博客
  • 博客访问: 170318
  • 博文数量: 77
  • 博客积分: 1400
  • 博客等级: 上尉
  • 技术积分: 990
  • 用 户 组: 普通用户
  • 注册时间: 2009-06-21 18:13
文章分类

全部博文(77)

文章存档

2011年(1)

2009年(76)

我的朋友

分类: Java

2009-06-28 15:47:46

要点:在服务端架构一个Socket线程池,每一个客户端连接到Server端,Server从线程池中调用一个空闲的线程处理一个client socket.
1.Server.java
 

package com.test.socket;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import com.test.threadpoolsocket.SocketThreadPool;

public class Server {
    public static void main(String args[]) {
        try {
            boolean flag = true;
            Socket clientSocket = null;
            String inputLine;
            int c;

            ServerSocket sSocket = new ServerSocket(8018);
            System.out.println("Server listen on:" + sSocket.getLocalPort());
            SocketThreadPool socketPool = new SocketThreadPool(5,10);

            while (flag) {
                clientSocket = sSocket.accept();
                socketPool.addNewSocket(clientSocket);
                
/*
                DataInputStream is = new DataInputStream(
                        new BufferedInputStream(clientSocket.getInputStream()));
                OutputStream os = clientSocket.getOutputStream();

                while ((inputLine = is.readLine()) != null) {
                    // 当客户端输入stop的时候服务器程序运行终止!
                    if (inputLine.equals("stop")) {
                        flag = false;
                        break;
                    } else {
                        System.out.println(inputLine);

                        while ((c = System.in.read()) != -1) {
                            os.write((byte) c);
                            if (c == -1) {
                                os.flush(); // 将信息发送到客户端
                                break;
                            }
                        }
                    }
                   */

                }
                
//is.close();

                
//os.close();

                
//clientSocket.close();


            
//}

            
//sSocket.close();

            
        } catch (Exception e) {
            System.out.println("Exception :" + e.getMessage());
            e.printStackTrace();
        }
    }
}

 
2.SocketThreadPool.java
 

package com.test.threadpoolsocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
public class SocketThreadPool {
 private int minPoolSize;
 private int maxPoolSize;
 private int totalThreadsNumber = 0;
 private int freeThreadsNumber = 0;
 private List<SocketThread> socketThreadsPoolList;
// save socket threads

 private List<Socket> socketList;
// save socket for buffer

 public SocketThreadPool(int minPoolSize, int maxPoolSize) {
  this.minPoolSize = minPoolSize;
  this.maxPoolSize = maxPoolSize;
  socketThreadsPoolList = new ArrayList<SocketThread>();
  socketList = new LinkedList<Socket>();
  for (int i = 0; i < minPoolSize; i++) {
   SocketThread socketThread = new SocketThread(false, i);
   totalThreadsNumber += 1;
   System.out.println("Initialing SocketThread#" + i);
   socketThreadsPoolList.add(socketThread);
   socketThread.start();
  }
 }
 public synchronized void addNewSocket(Socket socket) {
  socketList.add(socket);
  handlerSocketWork();
 }
 public synchronized SocketThread getIdleSocketThread() {
  Iterator it = socketThreadsPoolList.iterator();
  boolean hasIdleThread = false;
  for (int i = 0; i < socketThreadsPoolList.size(); i++) {
   SocketThread tempSocketThread = socketThreadsPoolList.remove(0);
   System.out.println("thread#" + tempSocketThread.number + ":"
     + tempSocketThread.isRunning);
   if (!tempSocketThread.isRunning()) {
    hasIdleThread = true;
    return tempSocketThread;
   }
  }
  if (!hasIdleThread) {
   if (totalThreadsNumber < maxPoolSize) {
    totalThreadsNumber += 1;
    SocketThread newSocketThread = new SocketThread(false,
      totalThreadsNumber);
    socketThreadsPoolList.add(newSocketThread);
    newSocketThread.start();
    return newSocketThread;
   }
  }
  return null;
 }
 public synchronized void handlerSocketWork() {
  Socket handlerSocket = null;
  SocketThread workSocketThread = null;
  while (true) {
   if (socketList.size() <= 0)
    return;
   synchronized (socketList) {
    handlerSocket = socketList.remove(0);
   }
   workSocketThread = getIdleSocketThread();
   try {
    if (workSocketThread == null) {
     Thread.sleep(1000);
     continue;
    }
    workSocketThread.setSocket(handlerSocket);
    workSocketThread.setThreadName(null);
    workSocketThread.setRunning(true);
    workSocketThread.notify(workSocketThread.isRunning);
    while (true) {
     if (!workSocketThread.isRunning) {
      socketThreadsPoolList.add(workSocketThread);
// put it

                 
// back to

                 
// lis

      break;
     }
    }
   } catch (InterruptedException e) {
    System.out.println("Get Thread Exception :" + e.getMessage());
    e.printStackTrace();
   }
  }
 }
}
class SocketThread extends Thread {
 boolean isRunning = false;
 String threadName;
 int number;
 Socket socket;
 SocketThread(boolean isRunning, int number) {
  this.number = number;
  this.isRunning = isRunning;
 }
 public synchronized void notify(boolean isRunning) {
  if (isRunning) {
   this.notify();
  }
 }
 public synchronized void run() {
  while (true) {
   try {
    if (!isRunning) {
     System.out
       .println("the SocketThread#" + number + ".wait()");
     wait();
    }
    System.out.println("Thread#" + number + " process socket:"
      + socket.toString());
    
// after one work complete,then set "isRunning=false"

    this.setRunning(false);
    sleep(1000);
//

   } catch (InterruptedException e) {
    System.out.println("Thread run Exception :" + e.getMessage());
    e.printStackTrace();
   }
  }
 }
 public Socket getSocket() {
  return socket;
 }
 public void setSocket(Socket socket) {
  this.socket = socket;
 }
 public boolean isRunning() {
  return isRunning;
 }
 public void setRunning(boolean isRunning) {
  this.isRunning = isRunning;
 }
 public String getThreadName() {
  return threadName;
 }
 public void setThreadName(String threadName) {
  this.threadName = threadName;
 }
}


 
 
3.Client.java
 
 

package com.test.socket;
import java.net.*;
import java.io.*;
public class Client {
 public static void main(String args[]) {
  int c;
  boolean flag = true;
  try {
   // 创建通讯并且和主机Rock连接

   Socket cSocket = new Socket("192.168.10.1", 8018);
   // 打开这个Socket的输入/输出流

   OutputStream os = cSocket.getOutputStream();
   DataInputStream is = new DataInputStream(cSocket.getInputStream());
   String responseline;
   while (flag) {
    // 从标准输入输出接受字符并且写如系统

    while ((c = System.in.read()) != -1) {
     os.write((byte) c);
     if (c == -1) {
      os.flush();
      // 将程序阻塞,直到回答信息被收到后将他们在标准输出上显示出来

      responseline = is.readLine();
      System.out.println("Message is:" + responseline);
     }
    }
   }
   os.close();
   is.close();
   cSocket.close();
  } catch (Exception e) {
   System.out.println("Exception :" + e.getMessage());
  }
 }
}

阅读(1283) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~