Chinaunix首页 | 论坛 | 博客
  • 博客访问: 581060
  • 博文数量: 104
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1559
  • 用 户 组: 普通用户
  • 注册时间: 2014-08-21 00:58
个人简介

锻炼精神,首先要锻炼肉体

文章分类

全部博文(104)

文章存档

2018年(1)

2016年(1)

2015年(101)

2014年(1)

我的朋友

分类: Java

2015-07-01 22:02:15

在这篇博文中,简单的梳理一下 Server 端分多次将一个大文件发送给 Client 端的情形。
消息类型,暂时定为
{
  1. Server_Ready_Send_File
  2. Server_Sending_File
  3. Server_End_Send_File
  4. Client_Ready_Recv_File
}

1. Server 启动

2. Client 请求连接

3. Server - Client 连接成功创建

4. Server 在创建连接的句柄方法中,创建消息{类型: Server_Ready_Send_File , 数据:<long:文件大小(字节) , byte[] 文件名称>}
  
5. Client 在接收到该消息之后,便在指定的路径下面创建文件,且,打开文件流,准备写操作;准备工作做好了之后,便向 Server  发送消息 {类型:Client_Ready_Send_File, 数据:<0>}

6. Server 在接收到 Client 的消息之后,便将文件打开,并且执行循环,读取文件中的内容,将文件中的数据封装成消息  {类型:Server_Sending_File,数据<文件中的数据转换为二进制类型>}
    循环按行读取文件,当得到结束标语'end of the file!' 的时候,便,将消息的类型转换为 Server_End_Send_File 。 然后,将该文件内容写入之后,发送消息。随后关闭掉与 client 的连接。

7. Client 在接收 Server 的 Server_Sending_File 之后,将其写入到打开的文件中,随后当接收到来自 Server_End_Send_File 类型的消息之后,便可以将该消息写入到文件中,
   然后,该 flush() ,flush,该 close(), clos
e . 关闭文件输入流之后,断开与 Server 之间的连接。



下面的代码经过测试,已经可以成功的分块传递大小为 17699324B 的文件,不过代码中仍旧有很多不太恰当,没有细化的地方。
{比如说传递的文件名称的统一,以及每次传递的文件字节数目在小于 (short = 2 byte = 16 bit 可以用来表示 2^15 B 大小的文件 - 3 byte )}
可以最大化传输的文件行数目。但是,这个项目代码暂时可以作为模板使用。

项目的结构与前一篇草稿篇介绍的一样,只不过,在这里我们并不适用 ClientHandlerListener , 而是直接将互相发送的代码写到各自的 Handler 中。

/src/
  |----------common/
                    |-------- MessageType
                    |-------- Message
                    |-------- MessageDecoder
                    |-------- MessageEncoder
  |---------- client/
                   |-------- Client 
                   |-------- ClientHandler
                   |-------- ClientHandlerListener
  |---------- server/
                   |-------- Server
                   |-------- ServerHandler



运行代码之前,建议在 /tmp/ 路径下面创建两个名称为 test1.txt ; test2.txt 的文件,
可以根据需要测试的数据量的大小来设置文件的大小,建议将两个文件设置一个超大,一个正常大小。 大文件用来测试程序运行过程中的稳定性,以及大数据量的发送情况



common/MessageType.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.common;



  2. /**
  3.  * Created by root on 6/30/15.
  4.  */
  5. public enum MessageType
  6. {
  7.     READY_SEND_FILE((byte)0x01) ,
  8.     READY_RECV_FILE((byte)0x04) ,
  9.     FILE_SENDING((byte)0x02) ,
  10.     FILE_END((byte)0x03) ,
  11.     SHUT_DOWN((byte)0x00),
  12.     UNKNOWN ((byte)0x00) ;


  13.     private final byte b ;

  14.     private MessageType ( byte b )
  15.     {
  16.         this.b = b ;
  17.     }


  18.     public static MessageType fromByte ( byte b )
  19.     {
  20.         for (MessageType code : values())
  21.         {
  22.             if (code.b == b )
  23.                 return code ;
  24.         }

  25.         return UNKNOWN ;
  26.     }

  27.     public byte getByte ()
  28.     {
  29.         return this.b ;
  30.     }
  31. }

common/Message.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.common;

  2. import java.io.Serializable ;

  3. /**
  4.  * Created by root on 6/30/15.
  5.  */

  6. public class Message implements Serializable
  7. {
  8.     private MessageType type ;
  9.     private short length ;
  10.     private byte [] data ;

  11.     public Message ()
  12.     {}

  13.     public Message (MessageType type , short len , byte [] data )
  14.     {
  15.         this.type = type ;
  16.         this.length = len ;
  17.         this.data = data ;
  18.     }

  19.     public short getLength ()
  20.     {
  21.         return this.length ;
  22.     }
  23.     public void setLength ( short len )
  24.     {
  25.         this.length = len ;
  26.     }


  27.     public MessageType getType ()
  28.     {
  29.         return this.type ;
  30.     }

  31.     public void setType ( MessageType type )
  32.     {
  33.         this.type = type ;
  34.     }

  35.     public byte [] getData ()
  36.     {
  37.         return this.data ;
  38.     }

  39.     public void setData ( byte [] data )
  40.     {
  41.         this.data = data ;
  42.     }


  43.     @Override
  44.     public String toString ()
  45.     {
  46.         return "\nmessage type :"+this.type +"\n"
  47.                 +"message length :"+this.length +"\n"
  48.                 +"message content :"+ new String (this.data) +"\n" ;
  49.     }




  50. }

common/MessageDecoder.java / not changed
common/MessageEncoder.java / not changed



client/Client.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.client;

  2. import org.kylin.zhang.common.* ;
  3. import org.jboss.netty.bootstrap.ClientBootstrap ;
  4. import org.jboss.netty.channel.ChannelFactory ;
  5. import org.jboss.netty.channel.ChannelPipeline ;
  6. import org.jboss.netty.channel.ChannelPipelineFactory ;
  7. import org.jboss.netty.channel.Channels ;
  8. import org.jboss.netty.channel.group.ChannelGroup ;
  9. import org.jboss.netty.channel.group.DefaultChannelGroup ;
  10. import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory ;

  11. import java.io.* ;
  12. import java.net.InetSocketAddress ;
  13. import java.util.concurrent.Executors ;
  14. import java.util.concurrent.atomic.AtomicInteger ;

  15. /**
  16.  * Created by root on 7/1/15.
  17.  */
  18. public class Client implements ClientHandlerListener
  19. {

  20.     private final String host ;
  21.     private final int port ;



  22.     private ClientHandler handler ;
  23.     private ChannelFactory clientFactory ;
  24.     private ChannelGroup channelGroup ;


  25.     private String file_path ;
  26.     private String file_name ;
  27.     private long file_len ;
  28.     private BufferedReader input ;

  29.     public Client ( String host , int port )
  30.     {
  31.         this.host = host ;
  32.         this.port = port ;

  33.         // default received stored path
  34.         // make sure its existances before running the program

  35.         this.file_path = "/tmp/data/" ;
  36.     }



  37.     public void messageReceived (Message message)
  38.     {
  39.          // we not use it in this test
  40.     }

  41.    public boolean start ()
  42.    {
  43.        this.clientFactory = new NioClientSocketChannelFactory(
  44.                Executors.newCachedThreadPool(), Executors.newCachedThreadPool()) ;


  45.         this.channelGroup = new DefaultChannelGroup(this + "-channelGroup") ;

  46.         this.handler = new ClientHandler (this, this.channelGroup ) ;

  47.        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
  48.            public ChannelPipeline getPipeline() throws Exception {

  49.                ChannelPipeline pipeline = Channels.pipeline() ;
  50.                pipeline.addLast("encoder" , MessageEncoder.getInstance()) ;
  51.                pipeline.addLast("decoder" , new MessageDecoder ()) ;
  52.                pipeline.addLast("handler" , handler) ;


  53.                return pipeline ;
  54.             }
  55.        } ;

  56.        ClientBootstrap bootstrap = new ClientBootstrap (this.clientFactory) ;

  57.        bootstrap.setOption("reuseAddress" , true ) ;
  58.        bootstrap.setOption ("tcpNoDealy" , true ) ;
  59.        bootstrap.setOption ("keepAlive" , true ) ;
  60.        bootstrap.setPipelineFactory(pipelineFactory ) ;


  61.        boolean connected = bootstrap.connect( new InetSocketAddress(host, port ))
  62.                .awaitUninterruptibly().isSuccess();

  63.        if ( !connected)
  64.        {
  65.            this.stop() ;
  66.        }

  67.        return connected ;
  68.    }

  69.     public void stop ()
  70.     {
  71.         if ( this.channelGroup != null )
  72.             this.channelGroup.close() ;
  73.         if ( this.clientFactory != null )
  74.             this.clientFactory.releaseExternalResources();
  75.     }
  76.     private void flood()
  77.     {
  78.         if ((this.channelGroup == null) || (this.clientFactory == null)) {
  79.             System.out.println("do not have any resources, return ");
  80.             return;
  81.         }

  82.          /// runnin only once
  83.         {
  84.             String data = "KyLin_Zhang" ;

  85.             Message msg = new Message(MessageType.FILE_SENDING ,(short)(data.getBytes().length+3), data.getBytes()) ;
  86.           // System.out.println("client flood :" + msg ) ;

  87.             this.handler.sendMessage(msg);

  88.         }
  89.     }

  90.     public static void main ( String [] args ) throws InterruptedException
  91.     {
  92.         final Client client = new Client("kylin", 9999) ;



  93.             if (!client.start())
  94.             {
  95.                 System.out.println("client failed to start");
  96.                 System.exit(-1);
  97.                 return;
  98.             }

  99.         System.out.println("Client started ....") ;

  100.        // System.out.println("call flood") ;
  101.      // client.flood() ;


  102.          Runtime.getRuntime().addShutdownHook(
  103.                new Thread()
  104.                {
  105.                    @Override
  106.                     public void run ()
  107.                    {
  108.                         client.stop () ;
  109.                    }
  110.                }) ;
  111.     }
  112. }

client/ClientHandler.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.client;

  2. import org.jboss.netty.channel.*;
  3. import org.jboss.netty.channel.group.ChannelGroup ;

  4. import org.kylin.zhang.common.* ;

  5. import java.io.*;

  6. /**
  7.  * Created by root on 7/1/15.
  8.  */
  9. public class ClientHandler extends SimpleChannelUpstreamHandler
  10. {

  11.     private final ClientHandlerListener listener ;
  12.     private final ChannelGroup channelGroup ;
  13.     private Channel channel ;

  14.     private String file_path ;
  15.     private String file_name ;
  16.     private long file_len ;
  17.     private BufferedWriter output ;


  18.     public ClientHandler(ClientHandlerListener listener , ChannelGroup channelGroup)
  19.     {
  20.         this.listener = listener ;
  21.         this.channelGroup = channelGroup ;

  22.         this.file_path = "/home/" ;
  23.     }

  24.     @Override
  25.     public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e)
  26.         throws Exception
  27.     {
  28.         if ( e.getMessage () instanceof Message )
  29.         {
  30.             // output this
  31.           // this.listener.messageReceived((Message)e.getMessage()) ;

  32.             Message message = (Message)e.getMessage() ;

  33.             // print out message

  34.        // System.out.println("message received from server "+ message ) ;
  35.             // after receive message , create a new file and write the message content into it


  36.        // this.channelGroup.disconnect() ;

  37.             // in this branch , we first get the message type and then if else if it
  38.             // by different kinds of messages , we use different methods
  39.             // if - READY_SEND_FILE ---> we extract file name , and file len
  40.             // as static member values, and create the file at target path
  41.             // get its InputStream object , ready to write , and return the 'READY_RECV_FILE'
  42.             // message to the server

  43.             // if - FILE_SENDING ----> we extract file contents and write each line
  44.             // into the opened file

  45.             // if -- FILE_END ---> we write the last block of the message data , then
  46.             // we flush and then close the file



  47.             MessageType recv_msg_type = message.getType() ;

  48.             if ( recv_msg_type == MessageType.READY_SEND_FILE)
  49.             {
  50.                 String data = new String (message.getData()) ;

  51.                 file_name = data.substring( data.indexOf(':')+1) ;


  52.                 this.file_len = Long.decode(data.substring(0 , data.lastIndexOf(':'))) ;

  53.                 System.out.println ("we got file_name : "+ this.file_name ) ;
  54.                 System.out.println ("we got file_len : "+this.file_len ) ;

  55.                 // we create file and then open it , and then , create response message to server

  56.                 File file = new File( this.file_path+this.file_name ) ;

  57.                 if ( file.exists())
  58.                     file.delete() ;

  59.                 try
  60.                 {
  61.                     file.createNewFile();
  62.                     this.output = new BufferedWriter( new FileWriter( file )) ;
  63.                 }
  64.                 catch (Exception ex )
  65.                 {}

  66.                 // here we create message of type of READY_RECV_FILE

  67.                 data = "client get ready" ;
  68.                 Message response_msg = new Message(MessageType.READY_RECV_FILE , (short)(3+data.getBytes().length),
  69.                         data.getBytes()) ;
  70.                 this.channel.write(response_msg) ;
  71.             }

  72.             if ( recv_msg_type == MessageType.FILE_SENDING)
  73.             {
  74.                 String data = new String (message.getData()) ;
  75.                 this.output.newLine();
  76.                 this.output.write(data);
  77.                 this.output.flush();

  78.             }

  79.             if ( recv_msg_type == MessageType.FILE_END)
  80.             {
  81.                 String data = new String (message.getData()) ;

  82.                 this.output.newLine();
  83.                 this.output.write(data) ;
  84.                 this.output.flush() ;

  85.                 this.output.close() ;

  86.                 // create message with type of SHUT_DOWN , after server received this it will
  87.                 // disconnect the channel

  88.                 data = "client gonning to shut down" ;
  89.                 Message msg = new Message (MessageType.SHUT_DOWN , (short)(3+data.getBytes().length) ,
  90.                         data.getBytes()) ;

  91.                 e.getChannel().write(msg) ;

  92.                 try
  93.                 {
  94.                     System.out.println("client going to shut down ......") ;
  95.                     Thread.sleep(4000);
  96.                     e.getChannel().disconnect() ;
  97.                 }
  98.                 catch ( Exception ex )
  99.                 {}
  100.             }

  101.         }
  102.         else
  103.         {
  104.             super.messageReceived(ctx, e ) ;
  105.         }

  106.     }

  107.     @Override
  108.     public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e)
  109.         throws Exception
  110.     {
  111.         System.out.println("client channelConnected: ") ;
  112.         this.channel = e.getChannel();



  113.         this.channelGroup.add(e.getChannel()) ;

  114.     }

  115.     public void sendMessage ( Message msg )
  116.     {
  117.         if ( this.channel != null )
  118.         {
  119.             System.out.println("sendMessage : "+ msg ) ;
  120.             this.channel.write(msg) ;

  121.         }
  122.     }
  123. }

client/ClientHandlerListener.java / not use

/ not use it , copy it from the previous blog is ok

server/Server.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.server;

  2. import org.kylin.zhang.common.* ;

  3. import org.jboss.netty.bootstrap.ServerBootstrap;
  4. import org.jboss.netty.channel.Channel ;
  5. import org.jboss.netty.channel.ChannelPipeline ;
  6. import org.jboss.netty.channel.ChannelPipelineFactory ;
  7. import org.jboss.netty.channel.Channels ;
  8. import org.jboss.netty.channel.ServerChannelFactory ;
  9. import org.jboss.netty.channel.group.DefaultChannelGroup;
  10. import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory ;

  11. import java.net.InetSocketAddress ;
  12. import java.util.concurrent.Executors ;


  13. /**
  14.  * Created by root on 7/1/15.
  15.  */
  16. public class Server {

  17.     private final String host ;
  18.     private final int port ;
  19.     private DefaultChannelGroup channelGroup ;
  20.     private ServerChannelFactory serverFactory ;



  21.     public Server ( String host , int port )
  22.     {
  23.         this.host = host ;
  24.         this.port = port ;

  25.     }

  26.     public boolean start ()
  27.     {
  28.         this.serverFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
  29.                 Executors.newCachedThreadPool()) ;

  30.         this.channelGroup = new DefaultChannelGroup (this + "-channelGroup") ;

  31.         ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory ()
  32.         {
  33.             public ChannelPipeline getPipeline() throws Exception
  34.             {
  35.                 ChannelPipeline pipeline = Channels.pipeline() ;
  36.                 pipeline.addLast ("encoder" , MessageEncoder.getInstance()) ;
  37.                 pipeline.addLast("decoder" , new MessageDecoder() ) ;
  38.                 pipeline.addLast("handler" , new ServerHandler(channelGroup)) ;

  39.                 return pipeline ;
  40.             }
  41.         } ;

  42.         ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory) ;
  43.         bootstrap.setOption("reuseAddress" , true) ;
  44.         bootstrap.setOption ("child.tcpNoDelay" , true ) ;
  45.         bootstrap.setOption ("child.keepAlive" , true ) ;
  46.         bootstrap.setPipelineFactory(pipelineFactory) ;


  47.         Channel channel = bootstrap.bind( new InetSocketAddress(this.host, this.port)) ;

  48.         if ( !channel.isBound())
  49.         {
  50.             this.stop() ;
  51.             return false ;
  52.         }

  53.         this.channelGroup.add(channel) ;
  54.         return true ;
  55.     }

  56.     public void stop ()
  57.     {
  58.         if (this.channelGroup != null )
  59.             this.channelGroup.close () ;
  60.         if ( this.serverFactory != null )
  61.             this.serverFactory.releaseExternalResources();
  62.     }


  63.     public static void main ( String [] args )
  64.     {
  65.         final Server server = new Server("kylin", 9999 ) ;

  66.         if ( !server.start ())
  67.         {
  68.             System.out.println("server failed to run ") ;
  69.             System.exit(-1);

  70.             return ; // not really needed
  71.         }

  72.         System.out.println("server started ..... ") ;

  73.         Runtime.getRuntime().addShutdownHook( new Thread ()
  74.         {
  75.             @Override
  76.             public void run ()
  77.             {
  78.                 server.stop() ;
  79.             }
  80.         });
  81.     }


  82. }


server/ServerHandler.java


点击(此处)折叠或打开

  1. package org.kylin.zhang.server;

  2. import org.kylin.zhang.common.* ;
  3. import org.jboss.netty.channel.ChannelHandlerContext ;
  4. import org.jboss.netty.channel.ChannelStateEvent ;
  5. import org.jboss.netty.channel.MessageEvent ;
  6. import org.jboss.netty.channel.SimpleChannelUpstreamHandler ;
  7. import org.jboss.netty.channel.group.ChannelGroup ;

  8. import java.io.BufferedReader;
  9. import java.io.File;
  10. import java.io.FileReader;
  11. import java.util.HashMap;
  12. import java.util.Iterator;
  13. import java.util.Map;

  14. /**
  15.  * Created by root on 7/1/15.
  16.  */
  17. public class ServerHandler extends SimpleChannelUpstreamHandler
  18. {
  19.     private final ChannelGroup channelGroup ;
  20.     private Map<String,String> fileHash ;

  21.     public ServerHandler ( ChannelGroup channelGroup )
  22.     {

  23.         this.channelGroup = channelGroup ;

  24.         this.fileHash = new HashMap<String,String>() ;

  25.         // here we initialize the fileHash
  26.         fileHash.put("test1" , "/tmp/test1.txt") ;
  27.         fileHash.put("test2" , "/tmp/test2.txt") ;
  28.     }

  29.     @Override
  30.     public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e )
  31.     {
  32.         System.out.println("-------------------------- **server gets a new connection**--------------------------") ;
  33.         this.channelGroup.add(e.getChannel()) ;


  34.         byte [] data = null ;

  35.         File file = new File ( fileHash.get("test1") ) ;
  36.         String len_fname = String.valueOf(file.length()) ;
  37.         len_fname += ":test1" ;

  38.         // file_length:file_name

  39.         data = len_fname.getBytes() ;

  40.         Message msg = new Message(MessageType.READY_SEND_FILE , (short)(3+data.length) ,data);

  41.         e.getChannel().write(msg) ;

  42.      // e.getChannel().disconnect() ;
  43.     }

  44.     @Override
  45.     public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e )
  46.             throws Exception
  47.     {

  48.         if ( e.getMessage() instanceof Message ) {


  49.             Message received_msg = (Message) e.getMessage();

  50.             MessageType recv_msg_type = received_msg.getType () ;

  51.             System.out.println("server received message: " + received_msg);

  52.             if ( recv_msg_type == MessageType.READY_RECV_FILE)
  53.             {
  54.                 // in this open the file and use while cycle send file data <=250 bytes to client
  55.                 // if read the last line 'the end of the file!'
  56.                 // change the response file state from SENDING_FILE to END_FILE
  57.                 // and then close the channel

  58.                 File f = new File (this.fileHash.get("test1")) ;
  59.                 BufferedReader input = new BufferedReader( new FileReader(f) ) ;

  60.                 String line ;

  61.                 while(!( line = input.readLine()).equals("the end of file!"))
  62.                 {
  63.                     Message send_msg = new Message( MessageType.FILE_SENDING , (short)(3+line.getBytes().length) ,
  64.                             line.getBytes()) ;

  65.                     e.getChannel().write(send_msg) ;
  66.                 }

  67.                 if (line.equals("the end of file!"))
  68.                 {
  69.                     Message send_msg = new Message (MessageType.FILE_END, (short)(3+line.getBytes().length),
  70.                             line.getBytes() ) ;
  71.                     e.getChannel().write(send_msg) ;
  72.                 }

  73.             }
  74.             else if ( recv_msg_type == MessageType.SHUT_DOWN)
  75.             {
  76.                 e.getChannel().disconnect() ;
  77.             }
  78.             else
  79.             {
  80.                 System.out.println ("UNKNOWN Message ") ;
  81.             }


  82.         }
  83.         else
  84.         {
  85.             super.messageReceived(ctx, e);
  86.         }

  87.     }
  88. }


运行结果:

先运行服务器端
(我在程序中,使用的是自己的主机别名创建的连接,根据不同的主机需要变动 hostname
):

server started .....
-------------------------- **server gets a new connection**--------------------------
server received message:
message type :READY_RECV_FILE
message length :19
message content :client get ready

server received message:
message type :SHUT_DOWN
message length :30
message content :client gonning to shut down



在运行客户端:

Client started ....
client channelConnected:
we got file_name : test1
we got file_len : 17699324
client going to shut down ......

Process finished with exit code 0

end
阅读(1726) | 评论(3) | 转发(0) |
给主人留下些什么吧!~~

夏目玲子2015-07-30 09:44:06

夏目玲子:这种方法并不适用于最新版的 netty ,建议使用  FixedLengthFrameDecoder 配合着 ByteToMessageDecode,MessageToByteEncoder 来使用。
最近在写这个,推荐参考链接  http://www.infoq.com/cn/articles/netty-codec-framework-analyse/

replace the switch with state pattern model will make things working better

回复 | 举报

夏目玲子2015-07-22 10:37:01

夏目玲子:Channel 使用方法和原理还不太熟悉,如果可以再熟悉一点,就可以写出更灵活的处理函数了 [笑 cry]

这种方法并不适用于最新版的 netty ,建议使用  FixedLengthFrameDecoder 配合着 ByteToMessageDecode,MessageToByteEncoder 来使用。
最近在写这个,推荐参考链接  http://www.infoq.com/cn/articles/netty-codec-framework-analyse/

回复 | 举报

夏目玲子2015-07-01 22:03:51

Channel 使用方法和原理还不太熟悉,如果可以再熟悉一点,就可以写出更灵活的处理函数了 [笑 cry]