Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1942193
  • 博文数量: 219
  • 博客积分: 8963
  • 博客等级: 中将
  • 技术积分: 2125
  • 用 户 组: 普通用户
  • 注册时间: 2005-10-19 12:48
个人简介

文章分类

全部博文(219)

文章存档

2021年(1)

2020年(3)

2015年(4)

2014年(5)

2012年(7)

2011年(37)

2010年(40)

2009年(22)

2008年(17)

2007年(48)

2006年(31)

2005年(4)

分类: Java

2011-06-24 14:32:26

以下是服务器端代码,代码在接收到请求后,处理交给线程池。线程池在处理完成后回复报文线客户端。
里面加入了一些性能统计功能,加入了处理报文数量的计数器,还有一个线程统计每秒收到的报文数。
  1. package com.chouy.udpdns;

  2. import java.io.IOException;
  3. import java.net.DatagramPacket;
  4. import java.net.DatagramSocket;
  5. import java.net.InetSocketAddress;
  6. import java.net.SocketAddress;
  7. import java.net.SocketException;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.atomic.AtomicInteger;

  11. public class UDPServer
  12. {
  13.     String host;
  14.     int port;
  15.     ExecutorService executor;
  16.     DatagramSocket datagramSocket;
  17.     AtomicInteger receiveCount;

  18.     public UDPServer(String host, int port)
  19.     {
  20.         this.host = host;
  21.         this.port = port;
  22.     }

  23.     public void setExecutor(ExecutorService executor)
  24.     {
  25.         this.executor = executor;
  26.     }
  27.     
  28.     public void start() throws SocketException
  29.     {
  30.         this.receiveCount = new AtomicInteger(0);
  31.         Thread t = new TimeThread(this.receiveCount, 1000);
  32.         t.setPriority(Thread.MAX_PRIORITY);
  33.         t.start();
  34.         SocketAddress socketAddress = new InetSocketAddress(host, port);
  35.         this.datagramSocket = new DatagramSocket(socketAddress);
  36.         this.datagramSocket.setReceiveBufferSize(1024 * 1000);
  37.         System.out.println("UDPServer start ... " + socketAddress);
  38.         while (true)
  39.         {
  40.             try
  41.             {
  42.                 DatagramPacket packet = DatagramSocketFactory.getDatagramPacket();
  43.                 this.datagramSocket.receive(packet);
  44.                 this.receiveCount.incrementAndGet();
  45.                 if (this.executor != null)
  46.                     this.executor.execute(new UDPRunnable(packet));
  47.                 else
  48.                 {
  49.                     DatagramPacket p = new DatagramPacket(packet.getData(), packet.getLength());
  50.                     // System.out.println("from ip:port " +this.packet.getAddress() + ":" + this.packet.getPort());

  51.                     p.setSocketAddress(packet.getSocketAddress());
  52.                     datagramSocket.send(p);
  53.                 }
  54.             }
  55.             catch (IOException e)
  56.             {
  57.                 e.printStackTrace();
  58.             }
  59.         }
  60.     }

  61.     class UDPRunnable implements Runnable
  62.     {
  63.         DatagramPacket packet;

  64.         public UDPRunnable(DatagramPacket packet)
  65.         {
  66.             this.packet = packet;
  67.         }

  68.         @Override
  69.         public void run()
  70.         {
  71.             try
  72.             {
  73.                 DatagramPacket p = new DatagramPacket(this.packet.getData(), this.packet.getLength());
  74.                 // System.out.println("from ip:port " +this.packet.getAddress() + ":" + this.packet.getPort());

  75.                 p.setSocketAddress(this.packet.getSocketAddress());
  76.                 datagramSocket.send(p);
  77.             }
  78.             catch (IOException e)
  79.             {
  80.                 e.printStackTrace();
  81.             }
  82.         }
  83.     }

  84.     class TimeThread extends Thread
  85.     {
  86.         long period;
  87.         AtomicInteger atom;
  88.         private int temp;

  89.         public TimeThread(AtomicInteger atomInt, int period)
  90.         {
  91.             this.atom = atomInt;
  92.             this.period = period;
  93.             this.temp = 0;
  94.         }

  95.         @Override
  96.         public void run()
  97.         {
  98.             while (true)
  99.             {
  100.                 int t = this.atom.get();
  101.                 if (t != this.temp)
  102.                 {
  103.                     System.out.printf("second recv: %6d, recv sum: %8d\n", (t - this.temp), t);
  104.                     this.temp = t;
  105.                 }
  106.                 try
  107.                 {
  108.                     Thread.sleep(this.period);
  109.                 }
  110.                 catch (InterruptedException e)
  111.                 {
  112.                     e.printStackTrace();
  113.                 }
  114.             }
  115.         }
  116.     }

  117.     public static void main(String[] args) throws IOException
  118.     {
  119.         UDPServer udpServer = new UDPServer("0.0.0.0", 1800);
  120.         udpServer.setExecutor(Executors.newFixedThreadPool(1));
  121.         udpServer.start();
  122.     }
  123. }

下面是客户端代码,也加入了计数器和线程池功能,但在测试中发现线程个数对性能提升不明显。

  1. package com.chouy.udpdns;

  2. import java.io.IOException;
  3. import java.net.DatagramPacket;
  4. import java.net.DatagramSocket;
  5. import java.net.InetSocketAddress;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.atomic.AtomicInteger;

  9. public class UDPClient
  10. {
  11.     public static AtomicInteger atomicInteger;

  12.     public String host;
  13.     public int port;
  14.     public ExecutorService executorService;
  15.     public DatagramSocket datagramSocket;

  16.     public UDPClient(String host, int port)
  17.     {
  18.         this.host = host;
  19.         this.port = port;
  20.     }

  21.     public void setExecutor(ExecutorService executorService)
  22.     {
  23.         this.executorService = executorService;
  24.     }

  25.     public void start(int nums)
  26.     {
  27.         Runtime.getRuntime().addShutdownHook(new Thread()
  28.         {
  29.             @Override
  30.             public void run()
  31.             {
  32.                 System.out.println("exit, count: " + atomicInteger.get());
  33.             }
  34.         });
  35.         try
  36.         {
  37.             datagramSocket = new DatagramSocket();
  38.             datagramSocket.connect(new InetSocketAddress(host, port));
  39.             byte[] bs = "你好".getBytes();
  40.             DatagramPacket packet = new DatagramPacket(bs, bs.length);
  41.             // packet.setAddress(socketAddress);

  42.             // datagramSocket.connect(socketAddress);

  43.             datagramSocket.setSoTimeout(30000);
  44.             atomicInteger = new AtomicInteger(nums);
  45.             for (int i = 0; i < nums; i++)
  46.             {
  47.                 // System.out.println("send success");

  48.                 this.executorService.execute(new UDPClientRunnable(packet));
  49.             }
  50.         }
  51.         catch (Exception e)
  52.         {
  53.             e.printStackTrace();
  54.         }
  55.         Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
  56.         while (!this.executorService.isTerminated())
  57.             if (atomicInteger.get() >= 1)
  58.                 Thread.currentThread().yield();
  59.             else
  60.             {
  61.                 break;
  62.             }
  63.         System.out.println(" main thread ended ... ");
  64.         System.exit(0);
  65.     }

  66.     class UDPClientRunnable implements Runnable
  67.     {
  68.         DatagramPacket packet;

  69.         public UDPClientRunnable(DatagramPacket p)
  70.         {
  71.             this.packet = p;
  72.         }

  73.         @Override
  74.         public void run()
  75.         {
  76.             try
  77.             {
  78.                 datagramSocket.send(packet);
  79.                 DatagramPacket p2 = new DatagramPacket(new byte[100], 100);
  80.                 datagramSocket.receive(p2);
  81.                 System.out.println(new String(p2.getData(), 0, p2.getLength()) + " "
  82.                         + atomicInteger.decrementAndGet());
  83.             }
  84.             catch (IOException e)
  85.             {
  86.                 System.out.println(e.getMessage());
  87.                 atomicInteger.decrementAndGet();
  88.             }
  89.         }
  90.     }

  91.     public static void main(String[] args)
  92.     {
  93.         UDPClient client = new UDPClient("192.168.21.117", 1800);
  94.         client.setExecutor(Executors.newFixedThreadPool(100));
  95.         client.start(100000);
  96.     }
  97. }

下面是一个工厂类,只是在服务器端生成接收报文用,实际没啥大用。

  1. package com.chouy.udpdns;

  2. import java.net.DatagramPacket;

  3. public class DatagramSocketFactory
  4. {
  5.     final static int bufferSize = 2048;

  6.     public static DatagramPacket getDatagramPacket()
  7.     {
  8.         byte[] buffer = new byte[bufferSize];
  9.         DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
  10.         return packet;
  11.     }
  12. }

因为代码非常简单,所以没有注释。

阅读(4266) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~