以下是服务器端代码,代码在接收到请求后,处理交给线程池。线程池在处理完成后回复报文线客户端。
里面加入了一些性能统计功能,加入了处理报文数量的计数器,还有一个线程统计每秒收到的报文数。
- package com.chouy.udpdns;
- import java.io.IOException;
- import java.net.DatagramPacket;
- import java.net.DatagramSocket;
- import java.net.InetSocketAddress;
- import java.net.SocketAddress;
- import java.net.SocketException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.atomic.AtomicInteger;
- public class UDPServer
- {
- String host;
- int port;
- ExecutorService executor;
- DatagramSocket datagramSocket;
- AtomicInteger receiveCount;
- public UDPServer(String host, int port)
- {
- this.host = host;
- this.port = port;
- }
- public void setExecutor(ExecutorService executor)
- {
- this.executor = executor;
- }
-
- public void start() throws SocketException
- {
- this.receiveCount = new AtomicInteger(0);
- Thread t = new TimeThread(this.receiveCount, 1000);
- t.setPriority(Thread.MAX_PRIORITY);
- t.start();
- SocketAddress socketAddress = new InetSocketAddress(host, port);
- this.datagramSocket = new DatagramSocket(socketAddress);
- this.datagramSocket.setReceiveBufferSize(1024 * 1000);
- System.out.println("UDPServer start ... " + socketAddress);
- while (true)
- {
- try
- {
- DatagramPacket packet = DatagramSocketFactory.getDatagramPacket();
- this.datagramSocket.receive(packet);
- this.receiveCount.incrementAndGet();
- if (this.executor != null)
- this.executor.execute(new UDPRunnable(packet));
- else
- {
- DatagramPacket p = new DatagramPacket(packet.getData(), packet.getLength());
- // System.out.println("from ip:port " +this.packet.getAddress() + ":" + this.packet.getPort());
- p.setSocketAddress(packet.getSocketAddress());
- datagramSocket.send(p);
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
- class UDPRunnable implements Runnable
- {
- DatagramPacket packet;
- public UDPRunnable(DatagramPacket packet)
- {
- this.packet = packet;
- }
- @Override
- public void run()
- {
- try
- {
- DatagramPacket p = new DatagramPacket(this.packet.getData(), this.packet.getLength());
- // System.out.println("from ip:port " +this.packet.getAddress() + ":" + this.packet.getPort());
- p.setSocketAddress(this.packet.getSocketAddress());
- datagramSocket.send(p);
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
- class TimeThread extends Thread
- {
- long period;
- AtomicInteger atom;
- private int temp;
- public TimeThread(AtomicInteger atomInt, int period)
- {
- this.atom = atomInt;
- this.period = period;
- this.temp = 0;
- }
- @Override
- public void run()
- {
- while (true)
- {
- int t = this.atom.get();
- if (t != this.temp)
- {
- System.out.printf("second recv: %6d, recv sum: %8d\n", (t - this.temp), t);
- this.temp = t;
- }
- try
- {
- Thread.sleep(this.period);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
- public static void main(String[] args) throws IOException
- {
- UDPServer udpServer = new UDPServer("0.0.0.0", 1800);
- udpServer.setExecutor(Executors.newFixedThreadPool(1));
- udpServer.start();
- }
- }
下面是客户端代码,也加入了计数器和线程池功能,但在测试中发现线程个数对性能提升不明显。
下面是一个工厂类,只是在服务器端生成接收报文用,实际没啥大用。
- package com.chouy.udpdns;
- import java.net.DatagramPacket;
- public class DatagramSocketFactory
- {
- final static int bufferSize = 2048;
- public static DatagramPacket getDatagramPacket()
- {
- byte[] buffer = new byte[bufferSize];
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- return packet;
- }
- }
因为代码非常简单,所以没有注释。
阅读(4266) | 评论(0) | 转发(0) |