Chinaunix首页 | 论坛 | 博客
  • 博客访问: 27288
  • 博文数量: 31
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 320
  • 用 户 组: 普通用户
  • 注册时间: 2021-05-11 17:57
文章分类
文章存档

2021年(31)

我的朋友

分类: 架构设计与优化

2021-06-27 15:38:33

概述

Thrift是一个可互操作和可伸缩服务的框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的、高效的服务。

Thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器。thrift允许你定义一个简单的定义文件中的数据类型和服务接口(IDL)。以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。

其传输数据采用二进制格式,相对于XML和JSON等序列化方式体积更小,对于高并发、大数据量和多语言的环境更有优势。 Thrift它含有三个主要的组件:protocol,transport和server,其中,protocol定义了消息是怎样序列化的,transport定义了消息是怎样在客户端和服务器端之间通信的,server用于从transport接收序列化的消息,根据protocol反序列化之,调用用户定义的消息处理器,并序列化消息处理器的响应,然后再将它们写回transport。

官网地址:thrift.apache.org

基本概念

架构图


堆栈的顶部是从Thrift定义文件生成的代码。Thrift 服务生成的客户端和处理器代码。这些由图中的棕色框表示。红色框为发送的数据结构(内置类型除外)也会生成代码。协议和传输是Thrift运行时库的一部分。因此使用Thrift可以定义服务,并且可以自由更改协议和传输,而无需重新生成代码。 Thrift还包括一个服务器基础结构,用于将协议和传输绑定在一起。有可用的阻塞,非阻塞,单线程和多线程服务器。 堆栈的“底层I / O”部分根据所开发语言而有所不同。对于Java和Python网络I / O,Thrift库利用内置库,而C ++实现使用自己的自定义实现。

数据类型:

基本类型:

  • bool:布尔值,true 或 false,对应 Java 的 boolean

  • byte:8 位有符号整数,对应 Java 的 byte

  • i16:16 位有符号整数,对应 Java 的 short

  • i32:32 位有符号整数,对应 Java 的 int

  • i64:64 位有符号整数,对应 Java 的 long

  • double:64 位浮点数,对应 Java 的 double

  • string:未知编码文本或二进制字符串,对应 Java 的 String

结构体类型:

  • struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean

集合类型:

  • list:对应 Java 的 ArrayList

  • set:对应 Java 的 HashSet

  • map:对应 Java 的 HashMap

异常类型:

  • exception:对应 Java 的 Exception

服务类型:

  • service:对应服务的类

数据传输层Transport

  • TSocket —— 使用阻塞式 I/O 进行传输,是最常见的模式

  • TFramedTransport —— 使用非阻塞方式,按块的大小进行传输,类似于 Java 中的 NIO,若使用 TFramedTransport 传输层,其服务器必须修改为非阻塞的服务类型

  • TNonblockingTransport —— 使用非阻塞方式,用于构建异步客户端

数据传输协议Protocol

Thrift 可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本 (text) 和二进制 (binary) 传输协议,为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目 / 产品中的实际需求。

常用协议有以下几种:

  • TBinaryProtocol : 二进制格式.

  • TCompactProtocol : 高效率的、密集的二进制压缩格式

  • TJSONProtocol : JSON格式

  • TSimpleJSONProtocol : 提供JSON只写协议, 生成的文件很容易通过脚本语言解析

注意:客户端和服务端的协议要一致。

服务器类型Server

  • TSimpleServer ——单线程服务器端使用标准的阻塞式 I/O,一般用于测试。

  • TThreadPoolServer —— 多线程服务器端使用标准的阻塞式 I/O,预先创建一组线程处理请求。

  • TNonblockingServer —— 多线程服务器端使用非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式。

  • THsHaServer —— 半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。它使用一个单独的线程来处理网络I/O,一个独立的worker线程池来处理消息。这样,只要有空闲的worker线程,消息就会被立即处理,因此多条消息能被并行处理。

  • TThreadedSelectorServer —— TThreadedSelectorServer允许你用多个线程来处理网络I/O。它维护了两个线程池,一个用来处理网络I/O,另一个用来进行请求的处理。当网络I/O是瓶颈的时候,TThreadedSelectorServer比THsHaServer的表现要好。

实现逻辑

服务端

实现服务处理接口 impl

创建TProcessor 创建TServerTransport 创建TProtocol 创建TServer 启动Server

客户端

创建Transport 创建TProtocol 基于TTransport和TProtocol创建 Client 调用Client的相应方法

ThriftServerDemo实例

新建 Maven项目,并且添加 thrift依赖包

  1.  

  2.        

  3.            org.apache.thrift

  4.            libthrift

  5.            0.9.3

  6.        

  7.        

  8.            org.slf4j

  9.            slf4j-log4j12

  10.            1.7.12

  11.        

  12.        

  13.            org.apache.logging.log4j

  14.            log4j-api

  15.            2.7

  16.        

  17.        

  18.            org.apache.logging.log4j

  19.            log4j-core

  20.            2.7

  21.        

  22.    

  23.    

  24.        

  25.            

  26.                org.apache.maven.plugins

  27.                maven-compiler-plugin

  28.                3.3

  29.                

  30.                    1.8

  31.                    1.8

  32.                    utf-8

  33.                

  34.            

  35.        

  36.    


编写 IDL接口并生成接口文件

  1. namespace java thrift.service

  2.  

  3. // 计算类型 - 仅限整数四则运算

  4. enum ComputeType {

  5.    ADD = 0;

  6.    SUB = 1;

  7.    MUL = 2;

  8.    DIV = 3;

  9. }

  10.  

  11. // 服务请求

  12. struct ComputeRequest {

  13.    1:required i64 x;

  14.    2:required i64 y;

  15.    3:required ComputeType computeType;

  16. }

  17.  

  18. // 服务响应

  19. struct ComputeResponse {

  20.    1:required i32 errorNo;

  21.    2:optional string errorMsg;

  22.    3:required i64 computeRet;

  23. }

  24.  

  25. service ComputeServer {

  26.    ComputeResponse getComputeResult(1:ComputeRequest request);

  27. }


执行编译命令:

  1. thrift-0.11.0.exe --gen java computeServer.thrift

拷贝生成的 Service类文件到 IDEA 

服务端接口实现

  1. public class ThriftTestImpl implements ComputeServer.Iface {

  2.    private static final Logger logger = LogManager.getLogger(ThriftTestImpl.class);

  3.    public ComputeResponse getComputeResult(ComputeRequest request) {

  4.        ComputeType computeType = request.getComputeType();

  5.        long x = request.getX();

  6.        long y = request.getY();

  7.        logger.info("get compute result begin. [x:{}] [y:{}] [type:{}]", x, y, computeType.toString());

  8.        long begin = System.currentTimeMillis();

  9.        ComputeResponse response = new ComputeResponse();

  10.        response.setErrorNo(0);

  11.        try {

  12.            long ret;

  13.            if (computeType == ComputeType.ADD) {

  14.                ret = add(x, y);

  15.                response.setComputeRet(ret);

  16.            } else if (computeType == ComputeType.SUB) {

  17.                ret = sub(x, y);

  18.                response.setComputeRet(ret);

  19.            } else if (computeType == ComputeType.MUL) {

  20.                ret = mul(x, y);

  21.                response.setComputeRet(ret);

  22.            } else {

  23.                ret = div(x, y);

  24.                response.setComputeRet(ret);

  25.            }

  26.        } catch (Exception e) {

  27.            response.setErrorNo(1001);

  28.            response.setErrorMsg(e.getMessage());

  29.            logger.error("exception:", e);

  30.        }

  31.        long end = System.currentTimeMillis();

  32.        logger.info("get compute result end. [errno:{}] cost:[{}ms]", response.getErrorNo(), (end - begin));

  33.        return response;

  34.    }

  35.    private long add(long x, long y) {

  36.        return x + y;

  37.    }

  38.    private long sub(long x, long y) {

  39.        return x - y;

  40.    }

  41.    private long mul(long x, long y) {

  42.        return x * y;

  43.    }

  44.    private long div(long x, long y) {

  45.        return x / y;

  46.    }

  47. }


服务端实现

  1. public class ServerMain {

  2.    private static final Logger logger = LogManager.getLogger(ServerMain.class);

  3.  

  4.    public static void main(String[] args) {

  5.        try {

  6.            //实现服务处理接口impl

  7.            ThriftTestImpl workImpl = new ThriftTestImpl();

  8.            //创建TProcessor

  9.            TProcessor tProcessor = new ComputeServer.Processor<ComputeServer.Iface>(workImpl);

  10.            //创建TServerTransport,非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式

  11.            final TNonblockingServerTransport transport = new TNonblockingServerSocket(9999);

  12.            //创建TProtocol

  13.            TThreadedSelectorServer.Args ttpsArgs = new TThreadedSelectorServer.Args(transport);

  14.            ttpsArgs.transportFactory(new TFramedTransport.Factory());

  15.            //二进制格式反序列化

  16.            ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());

  17.            ttpsArgs.processor(tProcessor);

  18.            ttpsArgs.selectorThreads(16);

  19.            ttpsArgs.workerThreads(32);

  20.            logger.info("compute service server on port :" + 9999);

  21.            //创建TServer

  22.            TServer server = new TThreadedSelectorServer(ttpsArgs);

  23.            //启动Server

  24.            server.serve();

  25.        } catch (Exception e) {

  26.            logger.error(e);

  27.        }

  28.    }

  29. }


服务端整体代码结构 

log4j2.xml配置文件

  1. xml version="1.0" encoding="UTF-8"?>

  2.  status="INFO" monitorInterval="30">

  3.    

  4.    

  5.        

  6.         name="Console" target="SYSTEM_OUT">

  7.            

  8.             pattern="%highlight{[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [%l] %m%n}"/>

  9.        

  10.  

  11.         name="RollingFileInfo" fileName="log/log.log" filePattern="log/log.log.%d{yyyy-MM-dd}">

  12.            

  13.             level="info" onMatch="ACCEPT" onMismatch="DENY"/>

  14.             pattern="[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [ LOGID:%X{logid} ] [%l] %m%n"/>

  15.            

  16.                 modulate="true" interval="1"/>

  17.                

  18.            

  19.        

  20.  

  21.         name="RollingFileError" fileName="log/error.log" filePattern="log/error.log.%d{yyyy-MM-dd}">

  22.            

  23.            

  24.                 level="warn" onMatch="ACCEPT" onMismatch="DENY" />

  25.            

  26.             pattern="[ %p ] %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] [%l] %m%n"/>

  27.            

  28.                 modulate="true" interval="1"/>

  29.                

  30.            

  31.        

  32.  

  33.    

  34.  

  35.    

  36.    

  37.        

  38.         name="org.springframework" level="INFO">

  39.         name="org.mybatis" level="INFO">

  40.         level="all">

  41.             ref="Console"/>

  42.             ref="RollingFileInfo"/>

  43.             ref="RollingFileError"/>

  44.        

  45.    


Jmeter测试类编写

利用JMeter调用Java测试类去调用对应的后台服务,并记住每次调用并获取反馈值的RT,ERR%,只需要按照单线程的方式去实现测试业务,也无需添加各种埋点收集数据 

 

新建一个 JavaMaven工程,添加 JMeter及 thrift依赖包

  1.        

  2.            org.apache.jmeter

  3.            ApacheJMeter_core

  4.            4.0

  5.        

  6.        

  7.            org.apache.jmeter

  8.            ApacheJMeter_java

  9.            4.0

  10.        

  11.  

  12.        

  13.            org.apache.thrift

  14.            libthrift

  15.            0.11.0

  16.        

  17.        

  18.            org.apache.logging.log4j

  19.            log4j-api

  20.            2.11.1

  21.        

  22.        

  23.            org.apache.logging.log4j

  24.            log4j-core

  25.            2.11.1

  26.        

  27.        

  28.            org.slf4j

  29.            slf4j-log4j12

  30.            1.7.25

  31.        

  32.    

  33.  

  34.    

  35.        

  36.            

  37.                org.apache.maven.plugins

  38.                maven-compiler-plugin

  39.                3.7.0

  40.                

  41.                    1.8

  42.                    1.8

  43.                    utf-8

  44.                

  45.            

  46.        

  47.    


ThriftClient测试类编写

  1. public class ThriftClient {

  2.    private ComputeServer.Client client = null;

  3.    private TTransport tTransport = null;

  4.  

  5.    public ThriftClient(String ip,int port){

  6.        try {

  7.            TTransport tTransport = new TFramedTransport(new TSocket(ip,port));

  8.            tTransport.open();

  9.            TProtocol tProtocol = new TBinaryProtocol(tTransport);

  10.            client = new ComputeServer.Client(tProtocol);

  11.        } catch (TTransportException e) {

  12.            e.printStackTrace();

  13.        }

  14.    }

  15.  

  16.    public ComputeResponse getResponse(ComputeRequest request){

  17.        try {

  18.            ComputeResponse response = client.getComputeResult(request);

  19.            return response;

  20.        } catch (TException e) {

  21.            e.printStackTrace();

  22.            return null;

  23.        }

  24.    }

  25.  

  26.    public void  close(){

  27.        if (tTransport != null && tTransport.isOpen()){

  28.            tTransport.close();

  29.        }

  30.    }

  31. }


注意:需要把编写 IDL接口文件拷贝到工程里

新建一个 JavaClass,如下例中的 TestThriftByJmeter,并继承 AbstractJavaSamplerClient。 AbstractJavaSamplerClient中默认实现了四个可以覆盖的方法,分别是 getDefaultParameters(), setupTest(), runTest()和 teardownTest()方法。

  • getDefaultParameters 方法主要用于设置传入界面的参数;

  • setupTest方法为初始化方法,用于初始化性能测试时的每个线程;

  • runTest方法为性能测试时的线程运行体;

  • teardownTest方法为测试结束方法,用于结束性能测试中的每个线程。

编写TestThriftByJmeter测试类

  1. public class TestThriftByJmeter extends AbstractJavaSamplerClient {

  2.    private ThriftClient client;

  3.    private ComputeRequest request;

  4.    private ComputeResponse response;

  5.  

  6.    //设置传入界面的参数

  7.    @Override

  8.    public Arguments getDefaultParameters(){

  9.        Arguments arguments = new Arguments();

  10.        arguments.addArgument("ip","172.16.14.251");

  11.        arguments.addArgument("port","9999");

  12.        arguments.addArgument("X","0");

  13.        arguments.addArgument("Y","0");

  14.        arguments.addArgument("type","0");

  15.        return arguments;

  16.    }

  17.  

  18.    //初始化方法

  19.    @Override

  20.    public void setupTest(JavaSamplerContext context){

  21.        //获取Jmeter中设置的参数

  22.        String ip = context.getParameter("ip");

  23.        int port = context.getIntParameter("port");

  24.        int x = context.getIntParameter("X");

  25.        int y = context.getIntParameter("Y");

  26.        ComputeType type = ComputeType.findByValue(context.getIntParameter("type"));

  27.  

  28.        //创建客户端

  29.        client = new ThriftClient(ip,port);

  30.        //设置request请求

  31.        request = new ComputeRequest(x,y,type);

  32.        super.setupTest(context);

  33.    }

  34.  

  35.    //性能测试线程运行体

  36.    @Override

  37.    public SampleResult runTest(JavaSamplerContext context) {

  38.        SampleResult result = new SampleResult();

  39.        //开始统计响应时间标记

  40.        result.sampleStart();

  41.        try {

  42.            long begin = System.currentTimeMillis();

  43.            response = client.getResponse(request);

  44.            long cost = (System.currentTimeMillis() - begin);

  45.            //打印时间戳差值。Java请求响应时间

  46.            System.out.println(response.toString()+" 总计花费:["+cost+"ms]");

  47.  

  48.            if (response == null){

  49.                //设置测试结果为fasle

  50.                result.setSuccessful(false);

  51.                return result;

  52.            }

  53.            if (response.getErrorNo() == 0){

  54.                //设置测试结果为true

  55.                result.setSuccessful(true);

  56.            }else{

  57.                result.setSuccessful(false);

  58.                result.setResponseMessage("ERROR");

  59.            }

  60.        }catch (Exception e){

  61.            result.setSuccessful(false);

  62.            result.setResponseMessage("ERROR");

  63.            e.printStackTrace();

  64.        }finally {

  65.            //结束统计响应时间标记

  66.            result.sampleEnd();

  67.        }

  68.        return result;

  69.    }

  70.  

  71.    //测试结束方法

  72.    public void tearDownTest(JavaSamplerContext context) {

  73.        if (client != null) {

  74.            client.close();

  75.        }

  76.  

  77.        super.teardownTest(context);

  78.    }

  79.  

  80. }


特别说明:

  1. result.setSamplerLabel("7D"); //设置java Sampler的标题

  2. result.setResponseOK();   //设置响应成功

  3. result.setResponseData(); //设置响应内容

编写测试Run Main方法

  1. public class RunMain {

  2.    public static void main(String[] args) {

  3.        Arguments arguments = new Arguments();

  4.        arguments.addArgument("ip","172.16.14.251");

  5.        arguments.addArgument("port","9999");

  6.        arguments.addArgument("X","1");

  7.        arguments.addArgument("Y","3");

  8.        arguments.addArgument("type","0");

  9.        JavaSamplerContext context = new JavaSamplerContext(arguments);

  10.        TestThriftByJmeter jmeter = new TestThriftByJmeter();

  11.  

  12.        jmeter.setupTest(context);

  13.        jmeter.runTest(context);

  14.        jmeter.tearDownTest(context);

  15.  

  16.    }

  17. }


测试结果通过

使用 mvn cleanpackage打包测试代码

使用 mvn dependency:copy-dependencies-DoutputDirectory=lib复制所依赖的jar包都会到项目下的lib目录下

复制测试代码 jar包到 jmeter\lib\ext目录下,复制依赖包到 jmeter\lib目录下

这里有两点需要注意:

  • 如果你的jar依赖了其他第三方jar,需要将其一起放到lib/ext下,否则会出现ClassNotFound错误

  • 如果在将jar放入lib/ext后,你还是无法找到你编写的类,且此时你是开着JMeter的,则需要重启一下JMeter

打开 Jmeter,在添加 Java请求时,注意要选择 Jmeter测试类,下面的列表中可以看到参数和默认值。

下面我们将进行性能压测,设置线程组,设置10个并发线程。

服务端日志:

阅读(823) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~