Chinaunix首页 | 论坛 | 博客
  • 博客访问: 303223
  • 博文数量: 105
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1569
  • 用 户 组: 普通用户
  • 注册时间: 2014-08-21 00:58
  • 认证徽章:
个人简介

锻炼精神,首先要锻炼肉体

文章分类

全部博文(105)

文章存档

2018年(1)

2016年(1)

2015年(102)

2014年(1)

我的朋友

分类: Java

2015-07-01 13:42:35

这篇文章简单记录了使用 jboss-netty 框架编写的自定义消息类型的接收与发送。
项目的源码目录结构如下
/src/
  |----------common/
                    |-------- MessageType
                    |-------- Message
                    |-------- MessageDecoder
                    |-------- MessageEncoder
  |---------- client/
                   |-------- Client 
                   |-------- ClientHandler
                   |-------- ClientHandlerListener
  |---------- server/
                   |-------- Server
                   |-------- ServerHandler
                  

common.MessageType.java
MessageType 是一个枚举类,在其中分别使用 byte 定义了不同的消息类型。

其中一个 MessageType 枚举类实例只能代表一种类型的 Message, 因为其中的成员变量 byte b 为 final。
在构造函数中被赋值之后,便不可改变。

成员方法: fromByte ( byte b )  用于为传入的 byte 数值分辨其类型,并返回该字节数值所代表的 MessageType 对象实例。
其中 values() 方法用来遍历 MessageType 枚举类中所定义的所有枚举数值{FILE_BEGIN, ... UNKNOWN}

点击(此处)折叠或打开

  1. package org.kylin.zhang.common;

  2. /**
  3.  * Created by root on 6/30/15.
  4.  */
  5. public enum MessageType
  6. {
  7.     FILE_BEGIN((byte)0x01) ,
  8.     FILE_SENDING((byte)0x02) ,
  9.     FILE_END((byte)0x03) ,
  10.     SHUT_DOWN((byte)0x00),
  11.     UNKNOWN ((byte)0x00) ;


  12.     private final byte b ;

  13.     private MessageType ( byte b )
  14.     {
  15.         this.b = b ;
  16.     }


  17.     public static MessageType fromByte ( byte b )
  18.     {
  19.         for (MessageType code : values())
  20.         {
  21.             if (code.b == b )
  22.                 return code ;
  23.         }

  24.         return UNKNOWN ;
  25.     }

  26.     public byte getByte ()
  27.     {
  28.         return this.b ;
  29.     }
  30. }



common.Message.java
自定义的消息类,在这里我们约定 Message 是由 {消息长度(2 bytes), 消息类型(1 byte) , 消息数据(消息总长度 - 3 bytes) }

点击(此处)折叠或打开

  1. package org.kylin.zhang.common;

  2. import java.io.Serializable ;

  3. /**
  4.  * Created by root on 6/30/15.
  5.  */

  6. public class Message implements Serializable
  7. {
  8.     private MessageType type ;
  9.     private short length ;
  10.     private byte [] data ;

  11.     public Message ()
  12.     {}

  13.     public Message (MessageType type , short len , byte [] data )
  14.     {
  15.         this.type = type ;
  16.         this.length = len ;
  17.         this.data = data ;
  18.     }

  19.     public short getLength ()
  20.     {
  21.         return this.length ;
  22.     }
  23.     public void setLength ( short len )
  24.     {
  25.         this.length = len ;
  26.     }


  27.     public MessageType getType ()
  28.     {
  29.         return this.type ;
  30.     }

  31.     public void setType ( MessageType type )
  32.     {
  33.         this.type = type ;
  34.     }

  35.     public byte [] getData ()
  36.     {
  37.         return this.data ;
  38.     }

  39.     public void setData ( byte [] data )
  40.     {
  41.         this.data = data ;
  42.     }


  43.     @Override
  44.     public String toString ()
  45.     {
  46.         return "\nmessage type :"+this.type +"\n"
  47.                 +"message length :"+this.length +"\n"
  48.                 +"message content :"+ new String (this.data) +"\n" ;
  49.     }


  50.     public static void main ( String [] args )
  51.     {
  52.         byte [] data = "Hello Aimer".getBytes() ;

  53.         Message msg =new Message () ;

  54.         msg.setType(MessageType.FILE_BEGIN);

  55.         msg.setLength((short)data.length );

  56.         msg.setData(data );

  57.         System.out.println (msg) ;


  58.     }

  59. }

common.MessageDecoder.java
消息的解码器,用于将 ChannelBuffer 对象解析成 Message 对象实例

点击(此处)折叠或打开

  1. package org.kylin.zhang.common;

  2. import org.jboss.netty.buffer.ChannelBuffer ;
  3. import org.jboss.netty.channel.Channel ;
  4. import org.jboss.netty.channel.ChannelHandlerContext ;
  5. import org.jboss.netty.handler.codec.replay.ReplayingDecoder ;

  6. /**
  7.  * Created by root on 6/30/15.
  8.  *
  9.  * decoder is used to convert ChannelBuffer instance into
  10.  * an Instance of Message
  11.  *
  12.  */
  13. public class MessageDecoder extends ReplayingDecoder<MessageDecoder.DecodingState>
  14. {
  15.     private Message message ;

  16.     public MessageDecoder ()
  17.     {
  18.         this.reset() ;
  19.     }

  20.   @Override
  21.   protected Object decode ( ChannelHandlerContext ctx, Channel ch , ChannelBuffer buffer, DecodingState state )
  22.         throws Exception
  23.   {
  24.     // notice the switch fall-through

  25.       switch (state)
  26.       {
  27.           case MESSAGE_LENGTH:

  28.               short len = buffer.readShort() ;
  29.               if ( len <= 0 )
  30.                   throw new Exception ("invalid message length ");

  31.               // pre-allocate data content buffer
  32.               byte [] data = new byte [len-3] ;
  33.               this.message.setData(data);
  34.               this.message.setLength (len) ;

  35.               checkpoint(DecodingState.MESSAGE_TYPE) ;

  36.           case MESSAGE_TYPE:

  37.               this.message.setType(MessageType.fromByte(buffer.readByte())) ;
  38.               checkpoint(DecodingState.MESSAGE_DATA) ;

  39.           case MESSAGE_DATA:
  40.               buffer.readBytes(this.message.getData() , 0 ,
  41.                       this.message.getLength()-3);

  42.            try
  43.            {
  44.             // System.out.println ("MessageDecoder : "+message) ;
  45.                return this.message ;
  46.            }
  47.            finally
  48.            {
  49.                this.reset() ;
  50.            }

  51.           default :
  52.               throw new Exception ("Unknown decoding state : "+state ) ;
  53.       }
  54.   }

  55.   private void reset ()
  56.   {
  57.       checkpoint(DecodingState.MESSAGE_LENGTH);
  58.       this.message = new Message () ;
  59.   }


  60.     public enum DecodingState
  61.     {
  62.         MESSAGE_LENGTH,
  63.         MESSAGE_TYPE,
  64.         MESSAGE_DATA ,
  65.     }
  66. }

common.MessageEncoder.java
消息的编码器,用于将消息对象实例(Instance of Message) 转换成 ChannelBuffer 对象,其中 ChannelBuffer 是 Netty 框架通用的数据流传输载体。
采用的是单例的设计模式,在 MessageEncoder 中创建了一个私有内部类,在私有内部类中的静态方法中,创建并返回了一个 MessageEncoder 的实例对象,
同时在 MessageEncoder 类中的公共方法 getInstance() 中,调用私有内部类的创建 MessageEncoder 实例对象的方法,将创建的单例对象返回给调用者。

点击(此处)折叠或打开

  1. package org.kylin.zhang.common;

  2. import org.jboss.netty.buffer.ChannelBuffer ;
  3. import org.jboss.netty.buffer.ChannelBuffers ;
  4. import org.jboss.netty.channel.Channel ;
  5. import org.jboss.netty.channel.ChannelHandler ;
  6. import org.jboss.netty.channel.ChannelHandlerContext ;
  7. import org.jboss.netty.handler.codec.oneone.OneToOneEncoder ;

  8. /**
  9.  * Created by root on 6/30/15.
  10.  *
  11.  * this class is used to converts an Message instance
  12.  * into a ChannelBuffer
  13.  *
  14.  * we use the singleton pattern
  15.  * and set it as stateless in order it can be used by multi-pipeline
  16.  *
  17.  */

  18. @ChannelHandler.Sharable

  19. public class MessageEncoder extends OneToOneEncoder
  20. {
  21.     private MessageEncoder ()
  22.     {}
  23.     private static final class InstanceHolder
  24.     {
  25.         private static final MessageEncoder INSTANCE = new MessageEncoder() ;
  26.     }

  27.     public static MessageEncoder getInstance ()
  28.     {
  29.         return InstanceHolder.INSTANCE ;
  30.     }

  31.     public static ChannelBuffer encodeMessage ( Message message ) throws IllegalArgumentException
  32.     {
  33.         System.out.println ("encodeMessage :" + message ) ;

  34.         // first check the Message Type , Unknow type and null type are illegaled for a message
  35.         if (message.getType() == null || ( message.getType() == MessageType.UNKNOWN))
  36.         {
  37.             throw new IllegalArgumentException("Message type can not be null or UNKNOWN ") ;
  38.         }
  39.         if ( message.getLength() == 0 )
  40.         {
  41.             throw new IllegalArgumentException("Message length can not be 0, at least 3 ") ;
  42.         }
  43.         if ( message.getData().length < 0 || message.getData().length > (message.getLength() -3))
  44.         {
  45.             System.out.println ("message.getData().legngth : "+ message.getData().length) ;
  46.             System.out.println("message.getLength () -3: "+ (message.getLength() -3) );
  47.             throw new IllegalArgumentException("Message data length can not less than 0 or larger than message length ") ;
  48.         }

  49.         // type (1b) , length (2b) , data(nb) = (3+n)b
  50.         int size = 3 + message.getData().length ;

  51.         ChannelBuffer buffer = ChannelBuffers.buffer(size) ;

  52.         buffer.writeShort(message.getLength()) ;
  53.         buffer.writeByte (message.getType().getByte()) ;
  54.         buffer.writeBytes(message.getData());

  55.         return buffer ;
  56.     }

  57.     // ---- OneToOneEncoder
  58.     @Override
  59.     protected Object encode ( ChannelHandlerContext ctx, Channel channel , Object msg )
  60.             throws Exception
  61.     {
  62.         if ( msg instanceof Message )
  63.             return encodeMessage((Message)msg) ;
  64.         else
  65.            return msg ;
  66.     }
  67. }


client.Client.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.client;

  2. import org.kylin.zhang.common.* ;
  3. import org.jboss.netty.bootstrap.ClientBootstrap ;
  4. import org.jboss.netty.channel.ChannelFactory ;
  5. import org.jboss.netty.channel.ChannelPipeline ;
  6. import org.jboss.netty.channel.ChannelPipelineFactory ;
  7. import org.jboss.netty.channel.Channels ;
  8. import org.jboss.netty.channel.group.ChannelGroup ;
  9. import org.jboss.netty.channel.group.DefaultChannelGroup ;
  10. import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory ;

  11. import java.net.InetSocketAddress ;
  12. import java.util.concurrent.Executors ;
  13. import java.util.concurrent.atomic.AtomicInteger ;

  14. /**
  15.  * Created by root on 7/1/15.
  16.  */
  17. public class Client implements ClientHandlerListener
  18. {

  19.     private final String host ;
  20.     private final int port ;



  21.     private ClientHandler handler ;
  22.     private ChannelFactory clientFactory ;
  23.     private ChannelGroup channelGroup ;



  24.     public Client ( String host , int port )
  25.     {
  26.         this.host = host ;
  27.         this.port = port ;

  28.     }



  29.     public void messageReceived (Message message)
  30.     {
  31.             // print out message

  32.         System.out.println("message received from server "+ message ) ;

  33.         System.out.println ("we send message again") ;

  34.         /// runnin only once
  35.         {
  36.             String data = "KyLin_Zhang" ;

  37.             Message msg = new Message(MessageType.FILE_SENDING ,(short)(data.getBytes().length+3), data.getBytes()) ;
  38.              System.out.println("client messageReceived :" + msg ) ;

  39.             this.handler.sendMessage(msg);

  40.         }
  41.     }

  42.    public boolean start ()
  43.    {
  44.        this.clientFactory = new NioClientSocketChannelFactory(
  45.                Executors.newCachedThreadPool(), Executors.newCachedThreadPool()) ;


  46.         this.channelGroup = new DefaultChannelGroup(this + "-channelGroup") ;

  47.         this.handler = new ClientHandler (this, this.channelGroup ) ;

  48.        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
  49.            public ChannelPipeline getPipeline() throws Exception {

  50.                ChannelPipeline pipeline = Channels.pipeline() ;
  51.                pipeline.addLast("encoder" , MessageEncoder.getInstance()) ;
  52.                pipeline.addLast("decoder" , new MessageDecoder ()) ;
  53.                pipeline.addLast("handler" , handler) ;


  54.                return pipeline ;
  55.             }
  56.        } ;

  57.        ClientBootstrap bootstrap = new ClientBootstrap (this.clientFactory) ;

  58.        bootstrap.setOption("reuseAddress" , true ) ;
  59.        bootstrap.setOption ("tcpNoDealy" , true ) ;
  60.        bootstrap.setOption ("keepAlive" , true ) ;
  61.        bootstrap.setPipelineFactory(pipelineFactory ) ;


  62.        boolean connected = bootstrap.connect( new InetSocketAddress(host, port ))
  63.                .awaitUninterruptibly().isSuccess();

  64.        if ( !connected)
  65.        {
  66.            this.stop() ;
  67.        }

  68.        return connected ;
  69.    }

  70.     public void stop ()
  71.     {
  72.         if ( this.channelGroup != null )
  73.             this.channelGroup.close() ;
  74.         if ( this.clientFactory != null )
  75.             this.clientFactory.releaseExternalResources();
  76.     }
  77.    

  78.     public static void main ( String [] args ) throws InterruptedException
  79.     {
  80.         final Client client = new Client ("kylin" , 9999) ;

  81.         if ( !client.start())
  82.         {
  83.             System.out.println ("client failed to start") ;
  84.             System.exit(-1) ;
  85.             return ;
  86.         }

  87.         System.out.println("Client started ....") ;

  88.        Runtime.getRuntime().addShutdownHook(
  89.                new Thread()
  90.                {
  91.                    @Override
  92.                     public void run ()
  93.                    {
  94.                        client.stop () ;
  95.                    }
  96.                }) ;
  97.     }
  98. }

client.ClientHandler.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.client;

  2. import org.jboss.netty.channel.*;
  3. import org.jboss.netty.channel.group.ChannelGroup ;

  4. import org.kylin.zhang.common.* ;

  5. /**
  6.  * Created by root on 7/1/15.
  7.  */
  8. public class ClientHandler extends SimpleChannelUpstreamHandler
  9. {

  10.     private final ClientHandlerListener listener ;
  11.     private final ChannelGroup channelGroup ;
  12.     private Channel channel ;


  13.     public ClientHandler(ClientHandlerListener listener , ChannelGroup channelGroup)
  14.     {
  15.         this.listener = listener ;
  16.         this.channelGroup = channelGroup ;
  17.     }

  18.     @Override
  19.     public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e)
  20.         throws Exception
  21.     {
  22.         if ( e.getMessage () instanceof Message )
  23.         {
  24.             // output this
  25.             this.listener.messageReceived((Message)e.getMessage()) ;
  26.         }
  27.         else
  28.         {
  29.             super.messageReceived(ctx, e ) ;
  30.         }
  31.     }

  32.     @Override
  33.     public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e)
  34.         throws Exception
  35.     {
  36.         System.out.println("client channelConnected: ") ;
  37.         this.channel = e.getChannel();

  38.         System.out.println ("is connected the channel ? "+ this.channel.isConnected()) ;

  39.         this.channelGroup.add(e.getChannel()) ;


  40.       /**
  41.        * here i will write a message sending to the server
  42.        * tell it a connection is comming
  43.        * */

  44.         String data = "connected request" ;

  45.         Message msg = new Message(MessageType.FILE_SENDING ,(short)(data.getBytes().length+3), data.getBytes()) ;
  46.         System.out.println("client flood :" + msg ) ;


  47.         System.out.println("sendConnectMessage : "+ msg ) ;
  48.         this.channel.write( msg ) ;

  49.     }



  50.     public void sendMessage ( Message msg )
  51.     {
  52.         if ( this.channel != null )
  53.         {
  54.             System.out.println("sendMessage : "+ msg ) ;
  55.             this.channel.write(msg) ;

  56.         }
  57.     }
  58. }

client.ClientHandlerListener.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.client;

  2. import org.kylin.zhang.common.Message;

  3. /**
  4.  * Created by root on 7/1/15.
  5.  */
  6. public interface ClientHandlerListener {
  7.     void messageReceived( Message msg ) ;
  8. }


server.Server.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.server;

  2. import org.kylin.zhang.common.* ;

  3. import org.jboss.netty.bootstrap.ServerBootstrap;
  4. import org.jboss.netty.channel.Channel ;
  5. import org.jboss.netty.channel.ChannelPipeline ;
  6. import org.jboss.netty.channel.ChannelPipelineFactory ;
  7. import org.jboss.netty.channel.Channels ;
  8. import org.jboss.netty.channel.ServerChannelFactory ;
  9. import org.jboss.netty.channel.group.DefaultChannelGroup;
  10. import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory ;

  11. import java.net.InetSocketAddress ;
  12. import java.util.concurrent.Executors ;

  13. /**
  14.  * Created by root on 7/1/15.
  15.  */
  16. public class Server {

  17.     private final String host ;
  18.     private final int port ;
  19.     private DefaultChannelGroup channelGroup ;
  20.     private ServerChannelFactory serverFactory ;

  21.     public Server ( String host , int port )
  22.     {
  23.         this.host = host ;
  24.         this.port = port ;
  25.     }

  26.     public boolean start ()
  27.     {
  28.         this.serverFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
  29.                 Executors.newCachedThreadPool()) ;

  30.         this.channelGroup = new DefaultChannelGroup (this + "-channelGroup") ;

  31.         ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory ()
  32.         {
  33.             public ChannelPipeline getPipeline() throws Exception
  34.             {
  35.                 ChannelPipeline pipeline = Channels.pipeline() ;
  36.                 pipeline.addLast ("encoder" , MessageEncoder.getInstance()) ;
  37.                 pipeline.addLast("decoder" , new MessageDecoder() ) ;
  38.                 pipeline.addLast("handler" , new ServerHandler(channelGroup)) ;

  39.                 return pipeline ;
  40.             }
  41.         } ;

  42.         ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory) ;
  43.         bootstrap.setOption("reuseAddress" , true) ;
  44.         bootstrap.setOption ("child.tcpNoDelay" , true ) ;
  45.         bootstrap.setOption ("child.keepAlive" , true ) ;
  46.         bootstrap.setPipelineFactory(pipelineFactory) ;


  47.         Channel channel = bootstrap.bind( new InetSocketAddress(this.host, this.port)) ;

  48.         if ( !channel.isBound())
  49.         {
  50.             this.stop() ;
  51.             return false ;
  52.         }

  53.         this.channelGroup.add(channel) ;
  54.         return true ;
  55.     }

  56.     public void stop ()
  57.     {
  58.         if (this.channelGroup != null )
  59.             this.channelGroup.close () ;
  60.         if ( this.serverFactory != null )
  61.             this.serverFactory.releaseExternalResources();
  62.     }


  63.     public static void main ( String [] args )
  64.     {
  65.         final Server server = new Server("kylin", 9999 ) ;

  66.         if ( !server.start ())
  67.         {
  68.             System.out.println("server failed to run ") ;
  69.             System.exit(-1);

  70.             return ; // not really needed
  71.         }

  72.         System.out.println("server started ..... ") ;

  73.         Runtime.getRuntime().addShutdownHook( new Thread ()
  74.         {
  75.             @Override
  76.             public void run ()
  77.             {
  78.                 server.stop() ;
  79.             }
  80.         });
  81.     }
  82. }

server.ServerHandler.java

点击(此处)折叠或打开

  1. package org.kylin.zhang.server;

  2. import org.kylin.zhang.common.* ;
  3. import org.jboss.netty.channel.ChannelHandlerContext ;
  4. import org.jboss.netty.channel.ChannelStateEvent ;
  5. import org.jboss.netty.channel.MessageEvent ;
  6. import org.jboss.netty.channel.SimpleChannelUpstreamHandler ;
  7. import org.jboss.netty.channel.group.ChannelGroup ;

  8. /**
  9.  * Created by root on 7/1/15.
  10.  */
  11. public class ServerHandler extends SimpleChannelUpstreamHandler
  12. {
  13.     private final ChannelGroup channelGroup ;

  14.     public ServerHandler ( ChannelGroup channelGroup )
  15.     {
  16.         this.channelGroup = channelGroup ;
  17.     }

  18.     @Override
  19.     public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e )
  20.     {
  21.         System.out.println("-------------------------- **server gets a new connection**--------------------------") ;
  22.         this.channelGroup.add(e.getChannel()) ;
  23.     }

  24.     @Override
  25.     public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e )
  26.             throws Exception
  27.     {
  28.         System.out.println("-------------------- **server receive a piece of message** -------------------------- ") ;
  29.         if ( e.getMessage() instanceof Message ) {
  30.             Message received_msg = (Message) e.getMessage();
  31.             System.out.println("server received message :" + received_msg);

  32.             if ( new String (received_msg.getData()).equals("KyLin_Zhang")) {
  33.                 //how to shutdown connection
  34.                 System.out.println ("received end message "+ received_msg) ;
  35.                 this.channelGroup.disconnect() ;
  36.             } else {

  37.                 String data = "Server Received Message ";

  38.                 Message msg = new Message(MessageType.FILE_END, (short) (data.getBytes().length + 3), data.getBytes());

  39.                 e.getChannel().write(msg);

  40.             }
  41.         }
  42.         else
  43.         {
  44.             super.messageReceived(ctx, e);
  45.         }
  46.     }
  47. }

end

阅读(1551) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

夏目玲子2015-07-01 16:28:44

要规则化一些来传输文件的话,最好要参考 RFC 文档。
待我看完 RFC 回来之后,在重新处理这坨代码的逻辑 [笑 cry,我觉得暂时回不来了]

评论热议
请登录后评论。

登录 注册