Chinaunix首页 | 论坛 | 博客
  • 博客访问: 518725
  • 博文数量: 99
  • 博客积分: 2030
  • 博客等级: 大尉
  • 技术积分: 783
  • 用 户 组: 普通用户
  • 注册时间: 2006-08-12 09:11
文章分类

全部博文(99)

文章存档

2023年(2)

2022年(1)

2020年(1)

2019年(1)

2018年(4)

2017年(16)

2016年(60)

2015年(1)

2013年(3)

2006年(10)

我的朋友

分类: Java

2016-03-30 10:08:22

RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。

众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。

为了支持高并发,传统的阻塞式 IO 显然不太合适,因此我们需要异步的 IO,即 NIO。Java 提供了 NIO 的解决方案,Java 7 也提供了更优秀的 NIO.2 支持,用 Java 实现 NIO 并不是遥不可及的事情,只是需要我们熟悉 NIO 的技术细节。

我们需要将服务部署在分布式环境下的不同节点上,通过服务注册的方式,让客户端来自动发现当前可用的服务,并调用这些服务。这需要一种服务注册表(Service Registry)的组件,让它来注册分布式环境下所有的服务地址(包括:主机名与端口号)。

应用、服务、服务注册表之间的关系见下图:

系统架构

每台 Server 上可发布多个 Service,这些 Service 共用一个 host 与 port,在分布式环境下会提供 Server 共同对外提供 Service。此外,为防止 Service Registry 出现单点故障,因此需要将其搭建为集群环境。

本文将为您揭晓开发轻量级分布式 RPC 框架的具体过程,该框架基于 TCP 协议,提供了 NIO 特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。

根据以上技术需求,我们可使用如下技术选型:

  1. Spring:它是最强大的依赖注入框架,也是业界的权威标准。
  2. Netty:它使 NIO 编程更加容易,屏蔽了 Java 底层的 NIO 细节。
  3. Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
  4. ZooKeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。

相关 Maven 依赖请见附录。

第一步:编写服务接口

 public interface HelloService {

    String hello(String name);
} 

将该接口放在独立的客户端 jar 包中,以供应用使用。

第二步:编写服务接口的实现类

 @RpcService(HelloService.class)
 // 指定远程接口 
public class HelloServiceImpl implements HelloService {
         @Override 
 public String hello(String name) { 
 return "Hello! " + name;
        }
} 

使用RpcService注解定义在服务接口的实现类上,需要对该实现类指定远程接口,因为实现类可能会实现多个接口,一定要告诉框架哪个才是远程接口。

RpcService代码如下:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component // 表明可被 Spring 扫描
public @interface RpcService {

    Class public class RpcServer implements ApplicationContextAware, InitializingBean { 
 private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class); 
 private String serverAddress; 
 private ServiceRegistry serviceRegistry;
 private Map handlerMap = new HashMap<>(); 
 // 存放接口名与服务对象之间的映射关系 
 public RpcServer(String serverAddress) { 
 this.serverAddress = serverAddress;
           }
 public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) { 
 this.serverAddress = serverAddress;
 this.serviceRegistry = serviceRegistry;
            } 
 @Override
 public void setApplicationContext(ApplicationContext ctx) throws BeansException {
                 Map serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); 
 // 获取所有带有 RpcService 注解的 Spring Bean 
 if (MapUtils.isNotEmpty(serviceBeanMap)) { 
 for (Object serviceBean : serviceBeanMap.values()) {
                            String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                            handlerMap.put(interfaceName, serviceBean);
                       }
                 }
       } 
 @Override
 public void afterPropertiesSet() throws Exception {
           EventLoopGroup bossGroup = new NioEventLoopGroup();
           EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
 @Override
 public void initChannel(SocketChannel channel) throws Exception {
 channel.pipeline().addLast(new RpcDecoder(RpcRequest.class))
 // 将 RPC 请求进行解码(为了处理请求) .addLast(new RpcEncoder(RpcResponse.class)) // 将 RPC 响应进行编码(为了返回响应) .addLast(new RpcHandler(handlerMap)); // 处理 RPC 请求 }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

            String[] array = serverAddress.split(":");
            String host = array[0]; int port = Integer.parseInt(array[1]);

            ChannelFuture future = bootstrap.bind(host, port).sync();
            LOGGER.debug("server started on port {}", port);
 if (serviceRegistry != null) {
                serviceRegistry.register(serverAddress); // 注册服务地址 }
                future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
} 

以上代码中,有两个重要的 POJO 需要描述一下,它们分别是RpcRequest与RpcResponse。

使用RpcRequest封装 RPC 请求,代码如下:

 public class RpcRequest {

    private String requestId;
    private String className;
    private String methodName;
    private Class public class RpcEncoder extends MessageToByteEncoder { 
 private Class genericClass; public RpcEncoder(Class genericClass) {
         this.genericClass = genericClass;
     }

    @Override 
 public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception 
 { if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serialize(in); out.writeInt(data.length); out.writeBytes(data);
        }
    }
} 

编写一个SerializationUtil工具类,使用Protostuff实现序列化:

 public class SerializationUtil {
   private static Map cachedSchema = new ConcurrentHashMap<>(); private static Objenesis objenesis = new ObjenesisStd(true); private SerializationUtil() {
         } @SuppressWarnings("unchecked") private static Schema getSchema(Class cls) {
        Schema schema = (Schema) cachedSchema.get(cls); if (schema == null) {
            schema = RuntimeSchema.createFrom(cls); if (schema != null) {
                cachedSchema.put(cls, schema);
            }
        } return schema;
       } @SuppressWarnings("unchecked") public static byte[] serialize(T obj) {
        Class cls = (Class) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try {
            Schema schema = getSchema(cls); return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
      } public static T deserialize(byte[] data, Class cls) { try {
            T message = (T) objenesis.newInstance(cls);
            Schema schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, message, schema); return message;
 } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e);
        }
    }
} 

以上了使用 Objenesis 来实例化对象,它是比 Java 反射更加强大。

注意:如需要替换其它序列化框架,只需修改SerializationUtil即可。当然,更好的实现方式是提供配置项来决定使用哪种序列化方式。

使用RpcHandler中处理 RPC 请求,只需扩展 Netty 的SimpleChannelInboundHandler抽象类即可,代码如下:

public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler.class);

    private final Map<String, Object> handlerMap;

    public RpcHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
 RpcResponse response = new RpcResponse();
        response.setRequestId(request.getRequestId()); 
 try
 Object result = handle(request);
             response.setResult(result);
         } catch (Throwable t) {
             response.setError(t);
        }
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

       private Object handle(RpcRequest request) throws Throwable {
 String className = request.getClassName(); 
 Object serviceBean = handlerMap.get(className); 
 Class serviceClass = serviceBean.getClass(); 
 String methodName = request.getMethodName(); 
 Class[] parameterTypes = request.getParameterTypes(); 
 Object[] parameters = request.getParameters();
            /*Method method = serviceClass.getMethod(methodName, parameterTypes);
 method.setAccessible(true); 
 return method.invoke(serviceBean, parameters);*/
 
 FastClass serviceFastClass = FastClass.create(serviceClass); 
 FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); 
 return serviceFastMethod.invoke(serviceBean, parameters);
      }

    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
 LOGGER.error("server caught exception", cause);
            ctx.close();
    }
} 

为了避免使用 Java 反射带来的性能问题,我们可以使用 CGLib 提供的反射 API,如上面用到的FastClass与FastMethod。

第七步:配置客户端

同样使用 Spring 配置文件来配置 RPC 客户端,spring.xml代码如下:

 <beans ...> 
<context:property-placeholder location="classpath:config.properties"/>  
 <bean id="serviceDiscovery" class="com.xxx.rpc.registry.ServiceDiscovery"> 
 <constructor-arg name="registryAddress" value="${registry.address}"/> 

 public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> { 
 private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
 private String host; 
 private int port; 
 private RpcResponse response; 
 private final Object obj = new Object(); 
 public RpcClient(String host, int port) { this.host = host; this.port = port;
       } 
 @Override 
 public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { 
 this.response = response; synchronized (obj) {
             obj.notifyAll(); // 收到响应,唤醒线程 }
      } 
 @Override 
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         LOGGER.error("client caught exception", cause);
         ctx.close();
     } 
 public RpcResponse send(RpcRequest request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(); try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline()
                            .addLast(new RpcEncoder(RpcRequest.class)) // 将 RPC 请求进行编码(为了发送请求) .addLast(new RpcDecoder(RpcResponse.class)) // 将 RPC 响应进行解码(为了处理响应) .addLast(RpcClient.this); // 使用 RpcClient 发送 RPC 请求 }
                })
                .option(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().writeAndFlush(request).sync(); synchronized (obj) {
                obj.wait(); // 未收到响应,使线程等待 } if (response != null) {
                future.channel().closeFuture().sync();
            } return response;
        } finally {
            group.shutdownGracefully();
        }
    }
} 

第十步:发送 RPC 请求

使用 JUnit 结合 Spring 编写一个单元测试,代码如下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring.xml")
public class HelloServiceTest {

    @Autowired private RpcProxy rpcProxy;

    @Test public void helloTest() {
        HelloService helloService = rpcProxy.create(HelloService.class);
        String result = helloService.hello("World"); Assert.assertEquals("Hello! World", result);
    }
} 

运行以上单元测试,如果不出意外的话,您应该会看到绿条。

总结

本文通过 Spring + Netty + Protostuff + ZooKeeper 实现了一个轻量级 RPC 框架,使用 Spring 提供依赖注入与参数配置,使用 Netty 实现 NIO 方式的数据传输,使用 Protostuff 实现对象序列化,使用 ZooKeeper 实现服务注册与发现。使用该框架,可将服务部署到分布式环境中的任意节点上,客户端通过远程接口来调用服务端的具体实现,让服务端与客户端的开发完全分离,为实现大规模分布式应用提供了基础支持。


阅读(1792) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~