这篇文章简单记录了使用 jboss-netty 框架编写的自定义消息类型的接收与发送。
项目的源码目录结构如下
/src/
|----------common/
|-------- MessageType
|-------- Message
|-------- MessageDecoder
|-------- MessageEncoder
|---------- client/
|-------- Client
|-------- ClientHandler
|-------- ClientHandlerListener
|---------- server/
|-------- Server
|-------- ServerHandler
common.MessageType.java
MessageType 是一个枚举类,在其中分别使用 byte 定义了不同的消息类型。
其中一个 MessageType 枚举类实例只能代表一种类型的 Message, 因为其中的成员变量 byte b 为 final。
在构造函数中被赋值之后,便不可改变。
成员方法: fromByte ( byte b ) 用于为传入的 byte 数值分辨其类型,并返回该字节数值所代表的 MessageType 对象实例。
其中 values() 方法用来遍历 MessageType 枚举类中所定义的所有枚举数值{FILE_BEGIN, ... UNKNOWN}
-
package org.kylin.zhang.common;
-
-
/**
-
* Created by root on 6/30/15.
-
*/
-
public enum MessageType
-
{
-
FILE_BEGIN((byte)0x01) ,
-
FILE_SENDING((byte)0x02) ,
-
FILE_END((byte)0x03) ,
-
SHUT_DOWN((byte)0x00),
-
UNKNOWN ((byte)0x00) ;
-
-
-
private final byte b ;
-
-
private MessageType ( byte b )
-
{
-
this.b = b ;
-
}
-
-
-
public static MessageType fromByte ( byte b )
-
{
-
for (MessageType code : values())
-
{
-
if (code.b == b )
-
return code ;
-
}
-
-
return UNKNOWN ;
-
}
-
-
public byte getByte ()
-
{
-
return this.b ;
-
}
-
}
common.Message.java
自定义的消息类,在这里我们约定 Message 是由 {消息长度(2 bytes), 消息类型(1 byte) , 消息数据(消息总长度 - 3 bytes) }
-
package org.kylin.zhang.common;
-
-
import java.io.Serializable ;
-
-
/**
-
* Created by root on 6/30/15.
-
*/
-
-
public class Message implements Serializable
-
{
-
private MessageType type ;
-
private short length ;
-
private byte [] data ;
-
-
public Message ()
-
{}
-
-
public Message (MessageType type , short len , byte [] data )
-
{
-
this.type = type ;
-
this.length = len ;
-
this.data = data ;
-
}
-
-
public short getLength ()
-
{
-
return this.length ;
-
}
-
public void setLength ( short len )
-
{
-
this.length = len ;
-
}
-
-
-
public MessageType getType ()
-
{
-
return this.type ;
-
}
-
-
public void setType ( MessageType type )
-
{
-
this.type = type ;
-
}
-
-
public byte [] getData ()
-
{
-
return this.data ;
-
}
-
-
public void setData ( byte [] data )
-
{
-
this.data = data ;
-
}
-
-
-
@Override
-
public String toString ()
-
{
-
return "\nmessage type :"+this.type +"\n"
-
+"message length :"+this.length +"\n"
-
+"message content :"+ new String (this.data) +"\n" ;
-
}
-
-
-
public static void main ( String [] args )
-
{
-
byte [] data = "Hello Aimer".getBytes() ;
-
-
Message msg =new Message () ;
-
-
msg.setType(MessageType.FILE_BEGIN);
-
-
msg.setLength((short)data.length );
-
-
msg.setData(data );
-
-
System.out.println (msg) ;
-
-
-
}
-
-
}
common.MessageDecoder.java
消息的解码器,用于将 ChannelBuffer 对象解析成 Message 对象实例。
-
package org.kylin.zhang.common;
-
-
import org.jboss.netty.buffer.ChannelBuffer ;
-
import org.jboss.netty.channel.Channel ;
-
import org.jboss.netty.channel.ChannelHandlerContext ;
-
import org.jboss.netty.handler.codec.replay.ReplayingDecoder ;
-
-
/**
-
* Created by root on 6/30/15.
-
*
-
* decoder is used to convert ChannelBuffer instance into
-
* an Instance of Message
-
*
-
*/
-
public class MessageDecoder extends ReplayingDecoder<MessageDecoder.DecodingState>
-
{
-
private Message message ;
-
-
public MessageDecoder ()
-
{
-
this.reset() ;
-
}
-
-
@Override
-
protected Object decode ( ChannelHandlerContext ctx, Channel ch , ChannelBuffer buffer, DecodingState state )
-
throws Exception
-
{
-
// notice the switch fall-through
-
-
switch (state)
-
{
-
case MESSAGE_LENGTH:
-
-
short len = buffer.readShort() ;
-
if ( len <= 0 )
-
throw new Exception ("invalid message length ");
-
-
// pre-allocate data content buffer
-
byte [] data = new byte [len-3] ;
-
this.message.setData(data);
-
this.message.setLength (len) ;
-
-
checkpoint(DecodingState.MESSAGE_TYPE) ;
-
-
case MESSAGE_TYPE:
-
-
this.message.setType(MessageType.fromByte(buffer.readByte())) ;
-
checkpoint(DecodingState.MESSAGE_DATA) ;
-
-
case MESSAGE_DATA:
-
buffer.readBytes(this.message.getData() , 0 ,
-
this.message.getLength()-3);
-
-
try
-
{
-
// System.out.println ("MessageDecoder : "+message) ;
-
return this.message ;
-
}
-
finally
-
{
-
this.reset() ;
-
}
-
-
default :
-
throw new Exception ("Unknown decoding state : "+state ) ;
-
}
-
}
-
-
private void reset ()
-
{
-
checkpoint(DecodingState.MESSAGE_LENGTH);
-
this.message = new Message () ;
-
}
-
-
-
public enum DecodingState
-
{
-
MESSAGE_LENGTH,
-
MESSAGE_TYPE,
-
MESSAGE_DATA ,
-
}
-
}
common.MessageEncoder.java
消息的编码器,用于将消息对象实例(Instance of Message) 转换成 ChannelBuffer 对象,其中 ChannelBuffer 是 Netty 框架通用的数据流传输载体。
采用的是单例的设计模式,在 MessageEncoder 中创建了一个私有内部类,在私有内部类中的静态方法中,创建并返回了一个 MessageEncoder 的实例对象,
同时在 MessageEncoder 类中的公共方法 getInstance() 中,调用私有内部类的创建 MessageEncoder 实例对象的方法,将创建的单例对象返回给调用者。
-
package org.kylin.zhang.common;
-
-
import org.jboss.netty.buffer.ChannelBuffer ;
-
import org.jboss.netty.buffer.ChannelBuffers ;
-
import org.jboss.netty.channel.Channel ;
-
import org.jboss.netty.channel.ChannelHandler ;
-
import org.jboss.netty.channel.ChannelHandlerContext ;
-
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder ;
-
-
/**
-
* Created by root on 6/30/15.
-
*
-
* this class is used to converts an Message instance
-
* into a ChannelBuffer
-
*
-
* we use the singleton pattern
-
* and set it as stateless in order it can be used by multi-pipeline
-
*
-
*/
-
-
@ChannelHandler.Sharable
-
-
public class MessageEncoder extends OneToOneEncoder
-
{
-
private MessageEncoder ()
-
{}
-
private static final class InstanceHolder
-
{
-
private static final MessageEncoder INSTANCE = new MessageEncoder() ;
-
}
-
-
public static MessageEncoder getInstance ()
-
{
-
return InstanceHolder.INSTANCE ;
-
}
-
-
public static ChannelBuffer encodeMessage ( Message message ) throws IllegalArgumentException
-
{
-
System.out.println ("encodeMessage :" + message ) ;
-
-
// first check the Message Type , Unknow type and null type are illegaled for a message
-
if (message.getType() == null || ( message.getType() == MessageType.UNKNOWN))
-
{
-
throw new IllegalArgumentException("Message type can not be null or UNKNOWN ") ;
-
}
-
if ( message.getLength() == 0 )
-
{
-
throw new IllegalArgumentException("Message length can not be 0, at least 3 ") ;
-
}
-
if ( message.getData().length < 0 || message.getData().length > (message.getLength() -3))
-
{
-
System.out.println ("message.getData().legngth : "+ message.getData().length) ;
-
System.out.println("message.getLength () -3: "+ (message.getLength() -3) );
-
throw new IllegalArgumentException("Message data length can not less than 0 or larger than message length ") ;
-
}
-
-
// type (1b) , length (2b) , data(nb) = (3+n)b
-
int size = 3 + message.getData().length ;
-
-
ChannelBuffer buffer = ChannelBuffers.buffer(size) ;
-
-
buffer.writeShort(message.getLength()) ;
-
buffer.writeByte (message.getType().getByte()) ;
-
buffer.writeBytes(message.getData());
-
-
return buffer ;
-
}
-
-
// ---- OneToOneEncoder
-
@Override
-
protected Object encode ( ChannelHandlerContext ctx, Channel channel , Object msg )
-
throws Exception
-
{
-
if ( msg instanceof Message )
-
return encodeMessage((Message)msg) ;
-
else
-
return msg ;
-
}
-
}
client.Client.java
-
package org.kylin.zhang.client;
-
-
import org.kylin.zhang.common.* ;
-
import org.jboss.netty.bootstrap.ClientBootstrap ;
-
import org.jboss.netty.channel.ChannelFactory ;
-
import org.jboss.netty.channel.ChannelPipeline ;
-
import org.jboss.netty.channel.ChannelPipelineFactory ;
-
import org.jboss.netty.channel.Channels ;
-
import org.jboss.netty.channel.group.ChannelGroup ;
-
import org.jboss.netty.channel.group.DefaultChannelGroup ;
-
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory ;
-
-
import java.net.InetSocketAddress ;
-
import java.util.concurrent.Executors ;
-
import java.util.concurrent.atomic.AtomicInteger ;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class Client implements ClientHandlerListener
-
{
-
-
private final String host ;
-
private final int port ;
-
-
-
-
private ClientHandler handler ;
-
private ChannelFactory clientFactory ;
-
private ChannelGroup channelGroup ;
-
-
-
-
public Client ( String host , int port )
-
{
-
this.host = host ;
-
this.port = port ;
-
-
}
-
-
-
-
public void messageReceived (Message message)
-
{
-
// print out message
-
-
System.out.println("message received from server "+ message ) ;
-
-
System.out.println ("we send message again") ;
-
-
/// runnin only once
-
{
-
String data = "KyLin_Zhang" ;
-
-
Message msg = new Message(MessageType.FILE_SENDING ,(short)(data.getBytes().length+3), data.getBytes()) ;
-
System.out.println("client messageReceived :" + msg ) ;
-
-
this.handler.sendMessage(msg);
-
-
}
-
}
-
-
public boolean start ()
-
{
-
this.clientFactory = new NioClientSocketChannelFactory(
-
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()) ;
-
-
-
this.channelGroup = new DefaultChannelGroup(this + "-channelGroup") ;
-
-
this.handler = new ClientHandler (this, this.channelGroup ) ;
-
-
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
-
public ChannelPipeline getPipeline() throws Exception {
-
-
ChannelPipeline pipeline = Channels.pipeline() ;
-
pipeline.addLast("encoder" , MessageEncoder.getInstance()) ;
-
pipeline.addLast("decoder" , new MessageDecoder ()) ;
-
pipeline.addLast("handler" , handler) ;
-
-
-
return pipeline ;
-
}
-
} ;
-
-
ClientBootstrap bootstrap = new ClientBootstrap (this.clientFactory) ;
-
-
bootstrap.setOption("reuseAddress" , true ) ;
-
bootstrap.setOption ("tcpNoDealy" , true ) ;
-
bootstrap.setOption ("keepAlive" , true ) ;
-
bootstrap.setPipelineFactory(pipelineFactory ) ;
-
-
-
boolean connected = bootstrap.connect( new InetSocketAddress(host, port ))
-
.awaitUninterruptibly().isSuccess();
-
-
if ( !connected)
-
{
-
this.stop() ;
-
}
-
-
return connected ;
-
}
-
-
public void stop ()
-
{
-
if ( this.channelGroup != null )
-
this.channelGroup.close() ;
-
if ( this.clientFactory != null )
-
this.clientFactory.releaseExternalResources();
-
}
-
-
-
public static void main ( String [] args ) throws InterruptedException
-
{
-
final Client client = new Client ("kylin" , 9999) ;
-
-
if ( !client.start())
-
{
-
System.out.println ("client failed to start") ;
-
System.exit(-1) ;
-
return ;
-
}
-
-
System.out.println("Client started ....") ;
-
-
Runtime.getRuntime().addShutdownHook(
-
new Thread()
-
{
-
@Override
-
public void run ()
-
{
-
client.stop () ;
-
}
-
}) ;
-
}
-
}
client.ClientHandler.java
-
package org.kylin.zhang.client;
-
-
import org.jboss.netty.channel.*;
-
import org.jboss.netty.channel.group.ChannelGroup ;
-
-
import org.kylin.zhang.common.* ;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class ClientHandler extends SimpleChannelUpstreamHandler
-
{
-
-
private final ClientHandlerListener listener ;
-
private final ChannelGroup channelGroup ;
-
private Channel channel ;
-
-
-
public ClientHandler(ClientHandlerListener listener , ChannelGroup channelGroup)
-
{
-
this.listener = listener ;
-
this.channelGroup = channelGroup ;
-
}
-
-
@Override
-
public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e)
-
throws Exception
-
{
-
if ( e.getMessage () instanceof Message )
-
{
-
// output this
-
this.listener.messageReceived((Message)e.getMessage()) ;
-
}
-
else
-
{
-
super.messageReceived(ctx, e ) ;
-
}
-
}
-
-
@Override
-
public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e)
-
throws Exception
-
{
-
System.out.println("client channelConnected: ") ;
-
this.channel = e.getChannel();
-
-
System.out.println ("is connected the channel ? "+ this.channel.isConnected()) ;
-
-
this.channelGroup.add(e.getChannel()) ;
-
-
-
/**
-
* here i will write a message sending to the server
-
* tell it a connection is comming
-
* */
-
-
String data = "connected request" ;
-
-
Message msg = new Message(MessageType.FILE_SENDING ,(short)(data.getBytes().length+3), data.getBytes()) ;
-
System.out.println("client flood :" + msg ) ;
-
-
-
System.out.println("sendConnectMessage : "+ msg ) ;
-
this.channel.write( msg ) ;
-
-
}
-
-
-
-
public void sendMessage ( Message msg )
-
{
-
if ( this.channel != null )
-
{
-
System.out.println("sendMessage : "+ msg ) ;
-
this.channel.write(msg) ;
-
-
}
-
}
-
}
client.ClientHandlerListener.java
-
package org.kylin.zhang.client;
-
-
import org.kylin.zhang.common.Message;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public interface ClientHandlerListener {
-
void messageReceived( Message msg ) ;
-
}
server.Server.java
-
package org.kylin.zhang.server;
-
-
import org.kylin.zhang.common.* ;
-
-
import org.jboss.netty.bootstrap.ServerBootstrap;
-
import org.jboss.netty.channel.Channel ;
-
import org.jboss.netty.channel.ChannelPipeline ;
-
import org.jboss.netty.channel.ChannelPipelineFactory ;
-
import org.jboss.netty.channel.Channels ;
-
import org.jboss.netty.channel.ServerChannelFactory ;
-
import org.jboss.netty.channel.group.DefaultChannelGroup;
-
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory ;
-
-
import java.net.InetSocketAddress ;
-
import java.util.concurrent.Executors ;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class Server {
-
-
private final String host ;
-
private final int port ;
-
private DefaultChannelGroup channelGroup ;
-
private ServerChannelFactory serverFactory ;
-
-
public Server ( String host , int port )
-
{
-
this.host = host ;
-
this.port = port ;
-
}
-
-
public boolean start ()
-
{
-
this.serverFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-
Executors.newCachedThreadPool()) ;
-
-
this.channelGroup = new DefaultChannelGroup (this + "-channelGroup") ;
-
-
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory ()
-
{
-
public ChannelPipeline getPipeline() throws Exception
-
{
-
ChannelPipeline pipeline = Channels.pipeline() ;
-
pipeline.addLast ("encoder" , MessageEncoder.getInstance()) ;
-
pipeline.addLast("decoder" , new MessageDecoder() ) ;
-
pipeline.addLast("handler" , new ServerHandler(channelGroup)) ;
-
-
return pipeline ;
-
}
-
} ;
-
-
ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory) ;
-
bootstrap.setOption("reuseAddress" , true) ;
-
bootstrap.setOption ("child.tcpNoDelay" , true ) ;
-
bootstrap.setOption ("child.keepAlive" , true ) ;
-
bootstrap.setPipelineFactory(pipelineFactory) ;
-
-
-
Channel channel = bootstrap.bind( new InetSocketAddress(this.host, this.port)) ;
-
-
if ( !channel.isBound())
-
{
-
this.stop() ;
-
return false ;
-
}
-
-
this.channelGroup.add(channel) ;
-
return true ;
-
}
-
-
public void stop ()
-
{
-
if (this.channelGroup != null )
-
this.channelGroup.close () ;
-
if ( this.serverFactory != null )
-
this.serverFactory.releaseExternalResources();
-
}
-
-
-
public static void main ( String [] args )
-
{
-
final Server server = new Server("kylin", 9999 ) ;
-
-
if ( !server.start ())
-
{
-
System.out.println("server failed to run ") ;
-
System.exit(-1);
-
-
return ; // not really needed
-
}
-
-
System.out.println("server started ..... ") ;
-
-
Runtime.getRuntime().addShutdownHook( new Thread ()
-
{
-
@Override
-
public void run ()
-
{
-
server.stop() ;
-
}
-
});
-
}
-
}
server.ServerHandler.java
-
package org.kylin.zhang.server;
-
-
import org.kylin.zhang.common.* ;
-
import org.jboss.netty.channel.ChannelHandlerContext ;
-
import org.jboss.netty.channel.ChannelStateEvent ;
-
import org.jboss.netty.channel.MessageEvent ;
-
import org.jboss.netty.channel.SimpleChannelUpstreamHandler ;
-
import org.jboss.netty.channel.group.ChannelGroup ;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class ServerHandler extends SimpleChannelUpstreamHandler
-
{
-
private final ChannelGroup channelGroup ;
-
-
public ServerHandler ( ChannelGroup channelGroup )
-
{
-
this.channelGroup = channelGroup ;
-
}
-
-
@Override
-
public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e )
-
{
-
System.out.println("-------------------------- **server gets a new connection**--------------------------") ;
-
this.channelGroup.add(e.getChannel()) ;
-
}
-
-
@Override
-
public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e )
-
throws Exception
-
{
-
System.out.println("-------------------- **server receive a piece of message** -------------------------- ") ;
-
if ( e.getMessage() instanceof Message ) {
-
Message received_msg = (Message) e.getMessage();
-
System.out.println("server received message :" + received_msg);
-
-
if ( new String (received_msg.getData()).equals("KyLin_Zhang")) {
-
//how to shutdown connection
-
System.out.println ("received end message "+ received_msg) ;
-
this.channelGroup.disconnect() ;
-
} else {
-
-
String data = "Server Received Message ";
-
-
Message msg = new Message(MessageType.FILE_END, (short) (data.getBytes().length + 3), data.getBytes());
-
-
e.getChannel().write(msg);
-
-
}
-
}
-
else
-
{
-
super.messageReceived(ctx, e);
-
}
-
}
-
}
end
阅读(3880) | 评论(1) | 转发(0) |