Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1940053
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2015-06-25 15:17:31

现在版本的hadoop各种server、client RPC端通信协议的实现是基于google的protocol buffers的,如果对这个不熟悉,读code的时候会比较痛苦一些,所以花了些时间学习了一下,然后仿照写了个比较简单的例子,麻雀虽小,五脏俱全,看懂了我这个或许对你读hadoop的code有帮助! :) 

我现在实现一个简单的server-client方式的calculator,client将计算请求序列化成protocol buffers形式然后发给server端,server端反序列化后将完成计算然后将结果序列化后返回给client端。 

先看一下最后整体的package结构(模仿hadoop的包命名,便于比较) 

package org.tao.pbtest.api: 
org.tao.pbtest.api.Calculator 
org.tao.pbtest.api.CalculatorPB 
org.tao.pbtest.api.CalculatorPBServiceImpl 

package org.tao.pbtest.server.business 
org.tao.pbtest.server.business.CalculatorService 

package org.tao.pbtest.ipc 
org.tao.pbtest.ipc.Server 

package org.tao.pbtest.proto 
org.tao.pbtest.proto.Calculator 
org.tao.pbtest.proto.CalculatorMsg 

package org.tao.pbtest.proto.test 
org.tao.pbtest.proto.test.TestCalculator 


  • step 1:


  • 首先看一下Calculator这个接口: 
    Java代码  收藏代码
    1. package org.tao.pbtest.api;  
    2.   
    3. public interface Calculator {  
    4.    public int add(int a, int b);  
    5.    public int minus(int a, int b);  
    6. }  


    这个计算器就进行简单的两种运算,两个整数的加减。 

  • step 2:

  • 然后定义两个proto文件:CalculatorMsg.proto和Calculator.proto。 

    第一个是运算的参数消息、返回结果消息,输入时两个整数,返回结果是一个整数。具体protocol buffers的语法此处不做解释了,可以参看google的文档。 

    Java代码  收藏代码
    1. option java_package = "org.tao.pbtest.proto";  
    2. option java_outer_classname = "CalculatorMsg";  
    3. option  java_generic_services = true;  
    4. option java_generate_equals_and_hash = true;  
    5.   
    6. message RequestProto {  
    7.    required string methodName = 1;  
    8.    required int32 num1 = 2;  
    9.    required int32 num2 = 3;  
    10. }  
    11.   
    12. message ResponseProto {  
    13.    required int32 result = 1;  
    14. }  


    第二个proto文件定义service: 
    Java代码  收藏代码
    1. option java_package = "org.tao.pbtest.proto";  
    2. option java_outer_classname = "Calculator";  
    3. option java_generic_service = true;  
    4. option java_generate_equals_and_hash = true;  
    5.   
    6. import "CalculatorMsg.proto"  
    7.   
    8. service CalculatorService {  
    9.    rpc add(RequestProto) returns (ResponseProto);  
    10.    rpc minus(RequestProto) returns (ResponseProto);  
    11. }  


    然后用protoc将此两个文件编译,生成两个java文件: 

    org.tao.pbtest.proto.Calculator 
    org.tao.pbtest.proto.CalculatorMsg 

  • step 3:

  • 然后定义一个CalculatorPB接口extends刚才生成的org.tao.pbtest.proto.Calculator.CalculatorService.BlockingInterface, 这是一个过渡作用的接口。 

    Java代码  收藏代码
    1. package org.tao.pbtest.server.api;  
    2.   
    3. import org.tao.pbtest.proto.Calculator.CalculatorService.BlockingService;  
    4.   
    5. public interface CalculatorPB extends BlockingInterface {  
    6. }  


  • step 4:

  • 还需要一个发送、接受信息的ipc server/client端。这里偷懒只实现一个最最简单的server端,什么并发啊,异常处理啊,nio啊统统不考虑,因为这不是重点。 

    Java代码  收藏代码
    1. package org.tao.pbtest.ipc;  
    2.   
    3. import java.io.DataInputStream;  
    4. import java.io.DataOutputStream;  
    5. import java.io.IOException;  
    6. import java.net.*;  
    7. import com.google.protobuf.*;  
    8. import com.google.protobuf.Descriptors.MethodDescriptor;  
    9. import org.tao.pbtest.proto.CalculatorMsg.RequestProto;  
    10. import org.tao.pbtest.proto.CalculatorMsg.ResponseProto;  
    11.   
    12. public class Server extends Thread {  
    13.    private Class<?> protocol;  
    14.    private BlockingService impl;  
    15.    private int port;  
    16.    private ServerSocket ss;  
    17.   
    18.    public Server(Class<?> protocol, BlockingService protocolImpl, int port){  
    19.       this.protocol = protocol;  
    20.       this.impl = protocolImpl;   
    21.       this.port = port;  
    22.    }  
    23.   
    24.    public void run(){  
    25.       Socket clientSocket = null;  
    26.       DataOutputStream dos = null;  
    27.       DataInputStream dis = null;  
    28.       try {  
    29.            ss = new ServerSocket(port);  
    30.        }catch(IOException e){  
    31.        }      
    32.        int testCount = 10//进行10次计算后就退出  
    33.   
    34.        while(testCount-- > 0){  
    35.           try {  
    36.                clientSocket = ss.accept();  
    37.                dos = new DataOutputStream(clientSocket.getOutputStream());  
    38.                dis = new DataInputStream(clientSocket.getInputStream());  
    39.                int dataLen = dis.readInt();  
    40.                byte[] dataBuffer = new byte[dataLen];  
    41.                int readCount = dis.read(dataBuffer);  
    42.                byte[] result = processOneRpc(dataBuffer);  
    43.   
    44.                dos.writeInt(result.length);  
    45.                dos.write(result);  
    46.                dos.flush();  
    47.            }catch(Exception e){  
    48.            }  
    49.        }  
    50.        try {   
    51.            dos.close();  
    52.            dis.close();  
    53.            ss.close();  
    54.        }catch(Exception e){  
    55.        };  
    56.   
    57.    }  
    58.   
    59.    public byte[] processOneRpc (byte[] data) throws Exception {  
    60.       RequestProto request = RequestProto.parseFrom(data);  
    61.       String methodName = request.getMethodName();  
    62.       MethodDescriptor methodDescriptor = impl.getDescriptorForType().findMethodByName(methodName);  
    63.       Message response = impl.callBlockingMethod(methodDescriptor, null, request);  
    64.       return response.toByteArray();  
    65.    }  
    66. }  
       

  • step 5:
  • CalculatorServer.java,实现计算器服务的类,此类依赖ipc Server接受请求并处理计算请求,注意到其自身实现了Calculator接口,本质上的计算是由其来完成的。也就是,Server接受客户端请求要执行方法M,Server对象里有实现了CalculatorPB接口的对象A,那么请求就交给A处理(A其实是CalculatorPBServiceImpl类的对象,此类后面介绍),此时A对应的M方法的参数是pb的形式,另外A对象里其实包含对CalculatorService的一个引用,所以在A的M方法里,先对参数反序列化,然后将参数交给CalculatorService处理。 

    Java代码  收藏代码
    1. package org.tao.pbtest.server.business;  
    2.   
    3. import java.lang.reflect.Constructor;  
    4. import java.lang.reflect.InvocationTargetException;  
    5. import java.lang.reflect.Method;  
    6.   
    7. import org.tao.pbtest.ipc.Server;  
    8. import org.tao.pbtest.server.api.Calculator;  
    9.   
    10. import com.google.protobuf.BlockingService;  
    11.   
    12. public class CalculatorService implements Calculator {      
    13.       
    14.     private Server server = null;  
    15.     private final Class protocol = Calculator.class;  
    16.     private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
    17.     private final String protoPackage = "org.tao.pbtest.proto";  
    18.     private final String host = "localhost";  
    19.     private final int port = 8038;  
    20.       
    21.     public CalculatorService (){  
    22.           
    23.     }  
    24.       
    25.     @Override  
    26.     public int add(int a, int b) {  
    27.         // TODO Auto-generated method stub  
    28.         return a+b;  
    29.     }  
    30.   
    31.     public int minus(int a, int b){  
    32.         return a-b;  
    33.     }  
    34.       
    35.       
    36.     public void init(){  
    37.         createServer();          
    38.     }  
    39.       
    40.       
    41.     /* 
    42.      * return org.tao.pbtest.server.api.CalculatorPBServiceImpl 
    43.      */  
    44.     public Class<?> getPbServiceImplClass(){  
    45.         String packageName = protocol.getPackage().getName();  
    46.         String className = protocol.getSimpleName();  
    47.         String pbServiceImplName =  packageName + "." + className +  "PBServiceImpl";          
    48.         Class<?> clazz = null;  
    49.         try{  
    50.             clazz = Class.forName(pbServiceImplName, true, classLoader);  
    51.         }catch(ClassNotFoundException e){  
    52.             System.err.println(e.toString());  
    53.         }  
    54.         return clazz;  
    55.     }  
    56.       
    57.     /* 
    58.      * return org.tao.pbtest.proto.Calculator$CalculatorService 
    59.      */  
    60.     public Class<?> getProtoClass(){  
    61.         String className = protocol.getSimpleName();  
    62.         String protoClazzName =  protoPackage + "." + className + "$" + className + "Service";  
    63.         Class<?> clazz = null;  
    64.         try{  
    65.             clazz = Class.forName(protoClazzName, true, classLoader);  
    66.         }catch(ClassNotFoundException e){  
    67.             System.err.println(e.toString());  
    68.         }  
    69.         return clazz;  
    70.     }  
    71.       
    72.     public void createServer(){  
    73.         Class<?> pbServiceImpl = getPbServiceImplClass();  
    74.         Constructor<?> constructor = null;  
    75.         try{  
    76.             constructor = pbServiceImpl.getConstructor(protocol);  
    77.             constructor.setAccessible(true);  
    78.         }catch(NoSuchMethodException e){  
    79.             System.err.print(e.toString());  
    80.         }  
    81.           
    82.         Object service = null;  // instance of CalculatorPBServiceImpl  
    83.         try {  
    84.             service = constructor.newInstance(this);  
    85.         }catch(InstantiationException e){  
    86.         } catch (IllegalArgumentException e) {  
    87.         } catch (IllegalAccessException e) {  
    88.         } catch (InvocationTargetException e) {  
    89.         }  
    90.           
    91.         /* 
    92.          * interface: org.tao.pbtest.server.CalculatorPB 
    93.          */  
    94.         Class<?> pbProtocol = service.getClass().getInterfaces()[0];  
    95.                   
    96.         /* 
    97.          * class: org.tao.pbtest.proto.Calculator$CalculatorService 
    98.          */  
    99.         Class<?> protoClazz = getProtoClass();  
    100.           
    101.         Method method = null;  
    102.         try {  
    103.   
    104.             // pbProtocol.getInterfaces()[] 即是接口 org.tao.pbtest.proto.Calculator$CalculatorService$BlockingInterface  
    105.   
    106.             method = protoClazz.getMethod("newReflectiveBlockingService", pbProtocol.getInterfaces()[0]);  
    107.             method.setAccessible(true);  
    108.         }catch(NoSuchMethodException e){  
    109.             System.err.print(e.toString());  
    110.         }  
    111.           
    112.         try{  
    113.             createServer(pbProtocol, (BlockingService)method.invoke(null, service));  
    114.         }catch(InvocationTargetException e){  
    115.         } catch (IllegalArgumentException e) {  
    116.         } catch (IllegalAccessException e) {  
    117.         }  
    118.           
    119.     }  
    120.       
    121.     public void createServer(Class pbProtocol, BlockingService service){  
    122.         server = new Server(pbProtocol, service, port);  
    123.         server.start();  
    124.     }  
    125.       
    126.     public static void main(String[] args){  
    127.         CalculatorService cs = new CalculatorService();  
    128.         cs.init();  
    129.     }  
    130. }  


  • step 6:
  • 刚才提到的PB格式跟最终实现的桥梁类:CalculatorPBServiceImpl 

    Java代码  收藏代码
    1. package org.tao.pbtest.server.api;  
    2.   
    3. import org.tao.pbtest.proto.CalculatorMsg.RequestProto;  
    4. import org.tao.pbtest.proto.CalculatorMsg.ResponseProto;  
    5.   
    6. import com.google.protobuf.RpcController;  
    7. import com.google.protobuf.ServiceException;  
    8.   
    9. public class CalculatorPBServiceImpl implements CalculatorPB {  
    10.   
    11.     public Calculator real;  
    12.       
    13.     public CalculatorPBServiceImpl(Calculator impl){  
    14.         this.real = impl;  
    15.     }  
    16.       
    17.     @Override  
    18.     public ResponseProto add(RpcController controller, RequestProto request) throws ServiceException {  
    19.         // TODO Auto-generated method stub  
    20.         ResponseProto proto = ResponseProto.getDefaultInstance();  
    21.         ResponseProto.Builder build = ResponseProto.newBuilder();  
    22.         int add1 = request.getNum1();  
    23.         int add2 = request.getNum2();  
    24.         int sum = real.add(add1, add2);  
    25.         ResponseProto result = null;  
    26.         build.setResult(sum);  
    27.         result = build.build();  
    28.         return result;  
    29.     }  
    30.   
    31.     @Override  
    32.     public ResponseProto minus(RpcController controller, RequestProto request) throws ServiceException {  
    33.         // TODO Auto-generated method stub  
    34.         ResponseProto proto = ResponseProto.getDefaultInstance();  
    35.         ResponseProto.Builder build = ResponseProto.newBuilder();  
    36.         int add1 = request.getNum1();  
    37.         int add2 = request.getNum2();  
    38.         int sum = real.minus(add1, add2);  
    39.         ResponseProto result = null;  
    40.         build.setResult(sum);  
    41.         result = build.build();  
    42.         return result;  
    43.     }  
    44.   
    45. }  



  • step 7:

  • 最后,偷懒没写客户端的东西,只是写了一个简单的测试例子: 
    Java代码  收藏代码
    1. package org.tao.pbtest.proto.test;  
    2.   
    3. import java.io.DataInputStream;  
    4. import java.io.DataOutputStream;  
    5. import java.io.IOException;  
    6. import java.net.Socket;  
    7. import java.util.Random;  
    8.   
    9. import org.tao.pbtest.proto.CalculatorMsg.RequestProto;  
    10. import org.tao.pbtest.proto.CalculatorMsg.ResponseProto;  
    11. import org.tao.pbtest.server.api.Calculator;  
    12.   
    13. public class TestCalculator implements Calculator {  
    14.   
    15.     public int doTest(String op, int a, int b){  
    16.         // TODO Auto-generated method stub  
    17.         Socket s = null;  
    18.         DataOutputStream out = null;  
    19.         DataInputStream in = null;  
    20.         int ret = 0;  
    21.         try {  
    22.             s= new Socket("localhost"8038);  
    23.             out = new DataOutputStream(s.getOutputStream());  
    24.             in = new DataInputStream(s.getInputStream());  
    25.               
    26.             RequestProto.Builder builder = RequestProto.newBuilder();  
    27.             builder.setMethodName(op);  
    28.             builder.setNum1(a);  
    29.             builder.setNum2(b);  
    30.             RequestProto request = builder.build();  
    31.               
    32.             byte [] bytes = request.toByteArray();  
    33.             out.writeInt(bytes.length);  
    34.             out.write(bytes);  
    35.             out.flush();  
    36.               
    37.             int dataLen = in.readInt();  
    38.             byte[] data = new byte[dataLen];  
    39.             int count = in.read(data);  
    40.             if(count != dataLen){  
    41.                 System.err.println("something bad happened!");  
    42.             }  
    43.               
    44.             ResponseProto result = ResponseProto.parseFrom(data);  
    45.             System.out.println(a + " " + op + " " +  b + "=" + result.getResult());              
    46.             ret =  result.getResult();  
    47.               
    48.         }catch(Exception e){  
    49.             e.printStackTrace();  
    50.             System.err.println(e.toString());  
    51.         }finally {  
    52.             try{  
    53.             in.close();  
    54.             out.close();  
    55.             s.close();  
    56.             }catch(IOException e){  
    57.                 e.printStackTrace();  
    58.             }  
    59.         }  
    60.         return ret;  
    61.     }  
    62.     @Override  
    63.     public int add(int a, int b) {  
    64.         // TODO Auto-generated method stub  
    65.         return doTest("add", a, b);  
    66.     }  
    67.   
    68.     @Override  
    69.     public int minus(int a, int b) {  
    70.         // TODO Auto-generated method stub  
    71.         return doTest("minus", a, b);  
    72.     }  
    73.   
    74.     /** 
    75.      * @param args 
    76.      */  
    77.     public static void main(String[] args) {  
    78.         // TODO Auto-generated method stub  
    79.         TestCalculator tc = new TestCalculator();  
    80.         int testCount = 5;  
    81.         Random rand = new Random();  
    82.         while(testCount-- > 0){  
    83.             int a = rand.nextInt(100);  
    84.             int b = rand.nextInt(100);  
    85.             tc.add(a,b);  
    86.             tc.minus(a, b);  
    87.         }          
    88.           
    89.     }  
    90.   
    91. }  


    输出: 

    76 add 14=90 
    76 minus 14=62 
    20 add 84=104 
    20 minus 84=-64 
    4 add 16=20 
    4 minus 16=-12 
    56 add 4=60 
    56 minus 4=52 
    46 add 50=96 
    46 minus 50=-4
    阅读(1013) | 评论(0) | 转发(0) |
    0

    上一篇:glog apply

    下一篇:protobuf

    给主人留下些什么吧!~~