Chinaunix首页 | 论坛 | 博客
  • 博客访问: 965069
  • 博文数量: 253
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2609
  • 用 户 组: 普通用户
  • 注册时间: 2019-03-08 17:29
个人简介

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

文章分类

全部博文(253)

文章存档

2022年(60)

2021年(81)

2020年(83)

2019年(29)

我的朋友

分类: Web开发

2021-10-12 10:06:57

一、WebFlux 简介

WebFlux 是 Spring Framework5.0 中引入的一种新的反应式Web框架。通过Reactor项目实现Reactive Streams规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步IO能够以少量而稳定的线程处理更高的吞吐,规避文件IO/网络IO阻塞带来的线程堆积。

1.1 WebFlux 的特性

WebFlux 具有以下特性:
  • 异步非阻塞 - 可以举一个上传例子。相对于 Spring MVC 是同步阻塞IO模型,Spring WebFlux这样处理:线程发现文件数据没传输好,就先做其他事情,当文件准备好时通知线程来处理(这里就是输入非阻塞方式),当接收完并写入磁盘(该步骤也可以采用异步非阻塞方式)完毕后再通知线程来处理响应(这里就是输出非阻塞方式)。
  • 响应式函数编程 - 相对于Java8 Stream 同步、阻塞的Pull模式,Spring Flux 采用Reactor Stream 异步、非阻塞Push模式。书写采用 Java lambda 方式,接近自然语言形式且容易理解。
  • 不拘束于Servlet - 可以运行在传统的Servlet 容器(3.1+版本),还能运行在Netty、Undertow等NIO容器中。

1.2 WebFlux 的设计目标

  • 适用高并发
  • 高吞吐量
  • 可伸缩性

二、Spring WebFlux 组件介绍

2.1 HTTPHandler

一个简单的处理请求和响应的抽象,用来适配不同HTTP服务容器的API。

2.2 WebHandler

一个用于处理业务请求抽象接口,定义了一系列处理行为。相关核心实现类如下;

2.3 DispatcherHandler

请求处理的总控制器,实际工作是由多个可配置的组件来处理。

WebFlux是兼容Spring MVC 基于@Controller,@RequestMapping等注解的编程开发方式的,可以做到平滑切换。

2.4 Functional Endpoints

这是一个轻量级函数编程模型。是基于@Controller,@RequestMapping等注解的编程模型的替代方案,提供一套函数式API 用于创建Router,Handler和Filter。调用处理组件如下:

简单的RouterFuntion 路由注册和业务处理过程:

  1. @Bean
  2. public RouterFunction<ServerResponse> initRouterFunction() {
  3.     return RouterFunctions.route()
  4.         .GET("/hello/{name}", serverRequest -> {
  5.             String name = serverRequest.pathVariable("name");
  6.             return ServerResponse.ok().bodyValue(name);
  7.         }).build();
  8. }
请求转发处理过程:

2.5 Reactive Stream

这是一个重要的组件,WebFlux 就是利用Reactor 来重写了传统Spring MVC 逻辑。其中Flux和Mono 是Reactor中两个关键概念。掌握了这两个概念才能理解WebFlux工作方式。
Flux和Mono 都实现了Reactor的Publisher接口,属于时间发布者,对消费者提供订阅接口,当有事件发生的时候,Flux或者Mono会通过回调消费者的相应的方法来通知消费者相应的事件。这就是所谓的响应式编程模型。
Mono工作流程图
只会在发送出单个结果后完成。
Flux工作流程图

发送出零个或者多个,可能无限个结果后才完成。

  1. 对于流式媒体类型:application/stream+json 或者 text/event-stream ,可以让调用端获得服务器滚动结果。
  2. 对于非流类型:application/json WebFlux 默认JSON编码器会将序列化的JSON 一次性刷新到网络,这并不意味着阻塞,因为结果Flux<?> 是以反应式方式写入网络的,没有任何障碍。

三、WebFlux 工作原理

3.1 组件装配过程

流程相关源码解析-WebFluxAutoConfiguration

  1. @Configuration
  2. //条件装配 只有启动的类型是REACTIVE时加载
  3. @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
  4. //只有存在 WebFluxConfigurer实例 时加载
  5. @ConditionalOnClass(WebFluxConfigurer.class)
  6. //在不存在 WebFluxConfigurationSupport实例时 加载
  7. @ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
  8. //在之后装配
  9. @AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class,
  10.       CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })
  11. //自动装配顺序
  12. @AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
  13. public class WebFluxAutoConfiguration {
  14.    @Configuration
  15.    @EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })
  16.    //接口编程 在装配WebFluxConfig 之前要先 装配EnableWebFluxConfiguration
  17.    @Import({ EnableWebFluxConfiguration.class })
  18.    public static class WebFluxConfig implements WebFluxConfigurer {
  19.       //隐藏部分源码
  20.      /**
  21.      * Configuration equivalent to {@code @EnableWebFlux}.
  22.      */
  23.    }
  24.     @Configuration
  25.     public static class EnableWebFluxConfiguration
  26.             extends DelegatingWebFluxConfiguration {
  27.         //隐藏部分代码
  28.     }
  29.     @Configuration
  30.     @ConditionalOnEnabledResourceChain
  31.     static class ResourceChainCustomizerConfiguration {
  32.         //隐藏部分代码
  33.     }
  34.     private static class ResourceChainResourceHandlerRegistrationCustomizer
  35.             implements ResourceHandlerRegistrationCustomizer {
  36.         //隐藏部分代码
  37.     }
WebFluxAutoConfiguration 自动装配时先自动装配EnableWebFluxConfiguration 而EnableWebFluxConfiguration->DelegatingWebFluxConfiguration ->WebFluxConfigurationSupport。
最终WebFluxConfigurationSupport 不仅配置DispatcherHandler 还同时配置了其他很多WebFlux核心组件包括 异常处理器WebExceptionHandler,映射处理器处理器HandlerMapping,请求适配器HandlerAdapter,响应处理器HandlerResultHandler 等。
DispatcherHandler 创建初始化过程如下;

  1. public class WebFluxConfigurationSupport implements ApplicationContextAware {
  2.    //隐藏部分代码
  3.    @Nullable
  4.    public final ApplicationContext getApplicationContext() {
  5.       return this.applicationContext;
  6.    }
  7.     //隐藏部分代码
  8.    @Bean
  9.    public DispatcherHandler webHandler() {
  10.       return new DispatcherHandler();
  11.    }
  1. public class DispatcherHandler implements WebHandler, ApplicationContextAware {
  2.    @Nullable
  3.    private List<HandlerMapping> handlerMappings;
  4.    @Nullable
  5.    private List<HandlerAdapter> handlerAdapters;
  6.    @Nullable
  7.    private List<HandlerResultHandler> resultHandlers;
  8.        @Override
  9.    public void setApplicationContext(ApplicationContext applicationContext) {
  10.         initStrategies(applicationContext);
  11.    }
  12.    protected void initStrategies(ApplicationContext context) {
  13.         //注入handlerMappings
  14.       Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
  15.             context, HandlerMapping.class, true, false);

  16.       ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
  17.       AnnotationAwareOrderComparator.sort(mappings);
  18.       this.handlerMappings = Collections.unmodifiableList(mappings);
  19.       //注入handlerAdapters
  20.       Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
  21.             context, HandlerAdapter.class, true, false);

  22.       this.handlerAdapters = new ArrayList<>(adapterBeans.values());
  23.       AnnotationAwareOrderComparator.sort(this.handlerAdapters);
  24.       //注入resultHandlers
  25.       Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
  26.             context, HandlerResultHandler.class, true, false);

  27.       this.resultHandlers = new ArrayList<>(beans.values());
  28.       AnnotationAwareOrderComparator.sort(this.resultHandlers);
  29.    }

流程相关源码解析-
HTTPHandlerAutoConfiguration
上面已讲解过WebFlux 核心组件装载过程,那么这些组件又是什么时候注入到对应的容器上下文中的呢?其实是在刷新容器上下文时注入进去的。
org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh

  1. public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
  2.       implements ConfigurableWebServerApplicationContext {
  3.    @Override
  4.    protected void onRefresh() {
  5.       super.onRefresh();
  6.       try {
  7.          createWebServer();
  8.       }
  9.       catch (Throwable ex) {
  10.          throw new ApplicationContextException("Unable to start reactive web server", ex);
  11.       }
  12.    }
  13.    private void createWebServer() {
  14.       WebServerManager serverManager = this.serverManager;
  15.       if (serverManager == null) {
  16.          String webServerFactoryBeanName = getWebServerFactoryBeanName();
  17.          ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
  18.          boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
  19.          // 这里创建容器管理时注入httpHandler
  20.          this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
  21.          getBeanFactory().registerSingleton("webServerGracefulShutdown",
  22.                new WebServerGracefulShutdownLifecycle(this.serverManager));
  23.          // 注册一个 web容器启动服务类,该类继承了SmartLifecycle
  24.          getBeanFactory().registerSingleton("webServerStartStop",
  25.                new WebServerStartStopLifecycle(this.serverManager));
  26.       }
  27.       initPropertySources();
  28.    }
  29.    protected HttpHandler getHttpHandler() {
  30.         String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
  31.         if (beanNames.length == 0) {
  32.             throw new ApplicationContextException(
  33.                     "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
  34.         }
  35.         if (beanNames.length > 1) {
  36.             throw new ApplicationContextException(
  37.                     "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
  38.                             + StringUtils.arrayToCommaDelimitedString(beanNames));
  39.         }
  40.         //容器上下文获取httpHandler
  41.         return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
  42.     }
而这个HTTPHandler 是由HTTPHandlerAutoConfiguration装配进去的。

  1. @Configuration
  2. @ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })
  3. @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
  4. @ConditionalOnMissingBean(HttpHandler.class)
  5. @AutoConfigureAfter({ WebFluxAutoConfiguration.class })
  6. @AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
  7. public class HttpHandlerAutoConfiguration {
  8.    @Configuration
  9.    public static class AnnotationConfig {
  10.       private ApplicationContext applicationContext;
  11.       public AnnotationConfig(ApplicationContext applicationContext) {
  12.          this.applicationContext = applicationContext;
  13.       }
  14.       //构建WebHandler
  15.       @Bean
  16.       public HttpHandler httpHandler() {
  17.          return WebHttpHandlerBuilder.applicationContext(this.applicationContext)
  18.                .build();
  19.       }
  20.    }
流程相关源码解析-web容器
org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在创建WebServerManager 容器管理器时会获取对应web容器实例,并注入响应的HTTPHandler。

  1. class WebServerManager {
  2.    private final ReactiveWebServerApplicationContext applicationContext;
  3.    private final DelayedInitializationHttpHandler handler;
  4.    private final WebServer webServer;
  5.    WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
  6.          Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
  7.       this.applicationContext = applicationContext;
  8.       Assert.notNull(factory, "Factory must not be null");
  9.       this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
  10.       this.webServer = factory.getWebServer(this.handler);
  11.    }
  12. }
以Tomcat 容器为例展示创建过程,使用的是 TomcatHTTPHandlerAdapter 来连接Servlet 请求到HTTPHandler组件。

  1. public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {
  2.     //隐藏部分代码
  3.     @Override
  4.     public WebServer getWebServer(HttpHandler httpHandler) {
  5.         if (this.disableMBeanRegistry) {
  6.             Registry.disableRegistry();
  7.         }
  8.         Tomcat tomcat = new Tomcat();
  9.         File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
  10.         tomcat.setBaseDir(baseDir.getAbsolutePath());
  11.         Connector connector = new Connector(this.protocol);
  12.         connector.setThrowOnFailure(true);
  13.         tomcat.getService().addConnector(connector);
  14.         customizeConnector(connector);
  15.         tomcat.setConnector(connector);
  16.         tomcat.getHost().setAutoDeploy(false);
  17.         configureEngine(tomcat.getEngine());
  18.         for (Connector additionalConnector : this.additionalTomcatConnectors) {
  19.             tomcat.getService().addConnector(additionalConnector);
  20.         }
  21.         TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
  22.         prepareContext(tomcat.getHost(), servlet);
  23.         return getTomcatWebServer(tomcat);
  24.     }
  25. }
最后Spring容器加载后通过SmartLifecycle实现类WebServerStartStopLifecycle 来启动Web容器。
WebServerStartStopLifecycle 注册过程详见:org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer

3.2 完整请求处理流程

(引用自:https://blog.csdn.net)

该图给出了一个HTTP请求处理的调用链路。是采用Reactor Stream 方式书写,只有最终调用 subscirbe 才真正执行业务逻辑。基于WebFlux 开发时要避免controller 中存在阻塞逻辑。列举下面例子可以看到Spring MVC 和Spring Webflux 之间的请求处理区别。

  1. @RestControllerpublic
  2. class TestController {
  3.     private Logger logger = LoggerFactory.getLogger(this.getClass());
  4.     @GetMapping("sync")
  5.     public String sync() {
  6.         logger.info("sync method start");
  7.         String result = this.execute();
  8.         logger.info("sync method end");
  9.         return result;
  10.     }
  11.     @GetMapping("async/mono")
  12.     public Mono<String> asyncMono() {
  13.         logger.info("async method start");
  14.         Mono<String> result = Mono.fromSupplier(this::execute);
  15.         logger.info("async method end");
  16.         return result;
  17.     }
  18.     private String execute() {
  19.         try {
  20.             TimeUnit.SECONDS.sleep(5);
  21.         } catch (InterruptedException e) {
  22.             e.printStackTrace();
  23.         }
  24.         return "hello";
  25.     }
  26. }
日志输出

  1. 2021-05-31 20:14:52.384 INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController : sync method start
  2. 2021-05-31 20:14:57.385 INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController : sync method end
  3. 2021-05-31 20:15:09.659 INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController : async method start
  4. 2021-05-31 20:15:09.660 INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController : async method end
从上面例子可以看出sync() 方法阻塞了请求,而asyncMono() 没有阻塞请求并立刻返回的。asyncMono() 方法具体业务逻辑 被包裹在了Mono 中Supplier中的了。当execute 处理完业务逻辑后通过回调方式响应给浏览器。

四、存储支持

一旦控制层使用了 Spring Webflux 则安全认证层、数据访问层都必须使用 Reactive API 才真正实现异步非阻塞。
NOSQL Database
  • MongoDB (org.springframework.boot:spring-boot-starter-data-mongodb-reactive)。
  • Redis(org.springframework.boot:spring-boot-starter-data-redis-reactive)。
Relational Database
  • H2 (io.r2dbc:r2dbc-h2)
  • MariaDB (org.mariadb:r2dbc-mariadb)
  • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
  • MySQL (dev.miku:r2dbc-mysql)
  • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
  • Postgres (io.r2dbc:r2dbc-postgresql)
  • Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

五、总结

关于Spring MVC 和Spring WebFlux 测评很多,本文引用下做简单说明。参考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。
基本依赖

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-data-r2dbc</artifactId>
  4. </dependency>
  5. <!-- r2dbc 连接池 -->
  6. <dependency>
  7.     <groupId >io.r2dbc</groupId>
  8.     <artifactId>r2dbc-pool</artifactId>
  9. </dependency>
  10. <!--r2dbc mysql 库-->
  11. <dependency>
  12.     <groupId>dev.miku</groupId>
  13.     <artifactId>r2dbc- mysql</artifactId>
  14. </dependency>
  15. <!--自动配置需要引入一个嵌入式数据库类型对象-->
  16. <dependency>
  17.     <groupId>org.springframework.boot</groupId>
  18.     <artifactId>spring-boot-starter-data-jdbc</artifactId>
  19. </dependency>
  20. <!-- 反应方程式 web 框架 webflux-->
  21. <dependency>
  22.     <groupId>org.springframework.boot</groupId>
  23.     <artifactId>spring-boot-starter-webflux</artifactId>
  24. </dependency>
相同数据下效果如下

Spring MVC + JDBC 在低并发下表现最好,但 WebFlux + R2DBC 在高并发下每个处理请求使用的内存最少。

Spring WebFlux + R2DBC 在高并发下,吞吐量表现优异。
作者:vivo互联网服务器团队-Zhou Changqing
阅读(939) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~