Chinaunix首页 | 论坛 | 博客
  • 博客访问: 965266
  • 博文数量: 253
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2609
  • 用 户 组: 普通用户
  • 注册时间: 2019-03-08 17:29
个人简介

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

文章分类

全部博文(253)

文章存档

2022年(60)

2021年(81)

2020年(83)

2019年(29)

我的朋友

分类: 架构设计与优化

2021-07-19 15:12:43

一、Hystrix解决了什么问题?

在复杂的分布式应用中有着许多的依赖,各个依赖都有难免在某个时刻失败,如果应用不隔离各个依赖,降低外部的风险,那容易拖垮整个应用。
举个电商场景中常见的例子,比如订单服务调用了库存服务、商品服务、积分服务、支付服务,系统均正常情况下,订单模块正常运行。

但是当积分服务发生异常时且会阻塞30s时,订单服务就有有部分请求失败,且工作线程阻塞在调用积分服务上。

流量高峰时,问题会更加严重,订单服务的所有请求都会阻塞在调用积分服务上,工作线程全部挂起,导致机器资源耗尽,订单服务也不可用,造成级联影响,整个集群宕机,这种称为雪崩效应。


所以需要一种机制,使得单个服务出现故障时,整个集群可用性不受到影响。Hystrix就是实现这种机制的框架,下面我们分析一下Hystrix整体的工作机制。

二、整体机制



【入口】
Hystrix的执行入口是HystrixCommand或HystrixObservableCommand对象,通常在Spring应用中会通过注解和AOP来实现对象的构造,以降低对业务代码的侵入性;
【缓存】HystrixCommand对象实际开始执行后,首先是否开启缓存,若开启缓存且命中,则直接返回;
【熔断】若熔断器打开,则执行短路,直接走降级逻辑;若熔断器关闭,继续下一步,进入隔离逻辑。熔断器的状态主要基于窗口期内执行失败率,若失败率过高,则熔断器自动打开;
【隔离】用户可配置走线程池隔离或信号量隔离,判断线程池任务已满(或信号量),则进入降级逻辑;否则继续下一步,实际由线程池任务线程执行业务调用;
【执行】实际开始执行业务调用,若执行失败或异常,则进入降级逻辑;若执行成功,则正常返回;
【超时】通过定时器延时任务检测业务调用执行是否超时,若超时则取消业务执行的线程,进入降级逻辑;若未超时,则正常返回。线程池、信号量两种策略均隔离方式支持超时配置(信号量策略存在缺陷);
【降级】进入降级逻辑后,当业务实现了HystrixCommand.getFallback() 方法,则返回降级处理的数据;当未实现时,则返回异常;
【统计】业务调用执行结果成功、失败、超时等均会进入统计模块,通过健康统计结果来决定熔断器打开或关闭。
都说源码里没有秘密,下面我们来分析下核心功能源码,看看Hystrix如何实现整体的工作机制。

三、熔断

家用电路中都有保险丝,保险丝的作用场景是,当电路发生故障或异常时,伴随着电流不断升高,并且升高的电流有可能损坏电路中的某些重要器件或贵重器件,也有可能烧毁电路甚至造成火灾。
若电路中正确地安置了保险丝,那么保险丝就会在电流异常升高到一定的高度和一定的时候,自身熔断切断电流,从而起到保护电路安全运行的作用。Hystrix提供的熔断器就有类似功能,应用调用某个服务提供者,当一定时间内请求总数超过配置的阈值,且窗口期内错误率过高,那Hystrix就会对调用请求熔断,后续的请求直接短路,直接进入降级逻辑,执行本地的降级策略。
Hystrix具有自我调节的能力,熔断器打开在一定时间后,会尝试通过一个请求,并根据执行结果调整熔断器状态,让熔断器在closed,open,half-open三种状态之间自动切换。

【HystrixCircuitBreaker】boolean attemptExecution():每次HystrixCommand执行,都要调用这个方法,判断是否可以继续执行,若熔断器状态为打开且超过休眠窗口,更新熔断器状态为half-open;通过CAS原子变更熔断器状态来保证只放过一条业务请求实际调用提供方,并根据执行结果调整状态。

  1. public boolean attemptExecution() {
  2.     //判断配置是否强制打开熔断器
  3.     if (properties.circuitBreakerForceOpen().get()) {
  4.         return false;
  5.     }
  6.     //判断配置是否强制关闭熔断器
  7.     if (properties.circuitBreakerForceClosed().get()) {
  8.         return true;
  9.     }
  10.     //判断熔断器开关是否关闭
  11.     if (circuitOpened.get() == -1) {
  12.         return true;
  13.     } else {
  14.         //判断请求是否在休眠窗口后
  15.         if (isAfterSleepWindow()) {
  16.             //更新开关为半开,并允许本次请求通过
  17.             if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
  18.                 return true;
  19.             } else {
  20.                 return false;
  21.             }
  22.         } else {
  23.             //拒绝请求
  24.             return false;
  25.         }
  26.     }
  27. }
【HystrixCircuitBreaker】void markSuccess():HystrixCommand执行成功后调用,当熔断器状态为half-open,更新熔断器状态为closed。此种情况为熔断器原本为open,放过单条请求实际调用服务提供者,并且后续执行成功,Hystrix自动调节熔断器为closed。

  1. public void markSuccess() {
  2.     //更新熔断器开关为关闭
  3.     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
  4.         //重置订阅健康统计
  5.         metrics.resetStream();
  6.         Subscription previousSubscription = activeSubscription.get();
  7.         if (previousSubscription != null) {
  8.             previousSubscription.unsubscribe();
  9.         }
  10.         Subscription newSubscription = subscribeToStream();
  11.         activeSubscription.set(newSubscription);
  12.         //更新熔断器开关为关闭
  13.         circuitOpened.set(-1L);
  14.     }
  15. }
【HystrixCircuitBreaker】void markNonSuccess():HystrixCommand执行成功后调用,若熔断器状态为half-open,更新熔断器状态为open。此种情况为熔断器原本为open,放过单条请求实际调用服务提供者,并且后续执行失败,Hystrix继续保持熔断器打开,并把此次请求作为休眠窗口期开始时间。

  1. public void markNonSuccess() {
  2.       //更新熔断器开关,从半开变为打开
  3.       if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
  4.           //记录失败时间,作为休眠窗口开始时间
  5.           circuitOpened.set(System.currentTimeMillis());
  6.       }
  7.   }
【HystrixCircuitBreaker】void subscribeToStream():熔断器订阅健康统计结果,若当前请求数据大于一定值且错误率大于阈值,自动更新熔断器状态为opened,后续请求短路,不再实际调用服务提供者,直接进入降级逻辑。

  1. private Subscription subscribeToStream() {
  2.     //订阅监控统计信息
  3.     return metrics.getHealthCountsStream()
  4.             .observe()
  5.             .subscribe(new Subscriber<HealthCounts>() {
  6.                 @Override
  7.                 public void onCompleted() {}
  8.                 @Override
  9.                 public void onError(Throwable e) {}
  10.                 @Override
  11.                 public void onNext(HealthCounts hc) {
  12.                     // 判断总请求数量是否超过配置阈值,若未超过,则不改变熔断器状态
  13.                     if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
  14.  
  15.                     } else {
  16.                         //判断请求错误率是否超过配置错误率阈值,若未超过,则不改变熔断器状态;若超过,则错误率过高,更新熔断器状态未打开,拒绝后续请求
  17.                         if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
  18.  
  19.                         } else {
  20.                             if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
  21.                                 circuitOpened.set(System.currentTimeMillis());
  22.                             }
  23.                         }
  24.                     }
  25.                 }
  26.             });
  27. }

四、资源隔离

在货船中,为了防止漏水和火灾的扩散,一般会将货仓进行分割,避免了一个货仓出事导致整艘船沉没的悲剧。同样的,在Hystrix中,也采用了这样的舱壁模式,将系统中的服务提供者隔离起来,一个服务提供者延迟升高或者失败,并不会导致整个系统的失败,同时也能够控制调用这些服务的并发度。如下图,订单服务调用下游积分、库存等服务使用不同的线程池,当积分服务故障时,只会把对应线程池打满,而不会影响到其他服务的调用。Hystrix隔离模式支持线程池和信号量两种方式。

4.1 信号量模式

信号量模式控制单个服务提供者执行并发度,比如单个CommondKey下正在请求数为N,若N小于maxConcurrentRequests,则继续执行;若大于等于maxConcurrentRequests,则直接拒绝,进入降级逻辑。信号量模式使用请求线程本身执行,没有线程上下文切换,开销较小,但超时机制失效。
【AbstractCommand】ObservableapplyHystrixSemantics(finalAbstractCommand _cmd):尝试获取信号量,若能获取到,则继续调用服务提供者;若不能获取到,则进入降级策略。

  1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
  2.     executionHook.onStart(_cmd);
  3.     //判断熔断器是否通过
  4.     if (circuitBreaker.attemptExecution()) {
  5.         //获取信号量
  6.         final TryableSemaphore executionSemaphore = getExecutionSemaphore();
  7.         final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
  8.         final Action0 singleSemaphoreRelease = new Action0() {
  9.             @Override
  10.             public void call() {
  11.                 if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
  12.                     executionSemaphore.release();
  13.                 }
  14.             }
  15.         };
  16.         final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
  17.             @Override
  18.             public void call(Throwable t) {
  19.                 eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
  20.             }
  21.         };
  22.         //尝试获取信号量
  23.         if (executionSemaphore.tryAcquire()) {
  24.             try {
  25.                 //记录业务执行开始时间
  26.                 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
  27.                 //继续执行业务
  28.                 return executeCommandAndObserve(_cmd)
  29.                         .doOnError(markExceptionThrown)
  30.                         .doOnTerminate(singleSemaphoreRelease)
  31.                         .doOnUnsubscribe(singleSemaphoreRelease);
  32.             } catch (RuntimeException e) {
  33.                 return Observable.error(e);
  34.             }
  35.         } else {
  36.             //信号量拒绝,进入降级逻辑
  37.             return handleSemaphoreRejectionViaFallback();
  38.         }
  39.     } else {
  40.         //熔断器拒绝,直接短路,进入降级逻辑
  41.         return handleShortCircuitViaFallback();
  42.     }
  43. }
【AbstractCommand】TryableSemaphore getExecutionSemaphore():获取信号量实例,若当前隔离模式为信号量,则根据commandKey获取信号量,不存在时初始化并缓存;若当前隔离模式为线程池,则使用默认信号量TryableSemaphoreNoOp.DEFAULT,全部请求可通过。

  1. protected TryableSemaphore getExecutionSemaphore() {
  2.     //判断隔离模式是否为信号量
  3.     if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
  4.         if (executionSemaphoreOverride == null) {
  5.             //获取信号量
  6.             TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
  7.             if (_s == null) {
  8.                 //初始化信号量并缓存
  9.                 executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
  10.                 //返回信号量
  11.                 return executionSemaphorePerCircuit.get(commandKey.name());
  12.             } else {
  13.                 return _s;
  14.             }
  15.         } else {
  16.             return executionSemaphoreOverride;
  17.         }
  18.     } else {
  19.         //返回默认信号量,任何请求均可通过
  20.         return TryableSemaphoreNoOp.DEFAULT;
  21.     }
  22. }

4.2 线程池模式

线程池模式控制单个服务提供者执行并发度,代码上都会先走获取信号量,只是使用默认信号量,全部请求可通过,然后实际调用线程池逻辑。线程池模式下,比如单个CommondKey下正在请求数为N,若N小于maximumPoolSize,会先从 Hystrix 管理的线程池里面获得一个线程,然后将参数传递给任务线程去执行真正调用,如果并发请求数多于线程池线程个数,就有任务需要进入队列排队,但排队队列也有上限,如果排队队列也满,则进去降级逻辑。线程池模式可以支持异步调用,支持超时调用,存在线程切换,开销大。
【AbstractCommand】ObservableexecuteCommandWithSpecifiedIsolation(final AbstractCommand _cmd):从线程池中获取线程,并执行,过程中记录线程状态。

  1. private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
  2.       //判断是否为线程池隔离模式
  3.       if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
  4.           return Observable.defer(new Func0<Observable<R>>() {
  5.               @Override
  6.               public Observable<R> call() {
  7.                   executionResult = executionResult.setExecutionOccurred();
  8.                   if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
  9.                       return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
  10.                   }
  11.                   //统计信息
  12.                   metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
  13.                   //判断是否超时,若超时,直接抛出异常
  14.                   if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
  15.                       return Observable.error(new RuntimeException("timed out before executing run()"));
  16.                   }
  17.                   //更新线程状态为已开始
  18.                   if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
  19.                       HystrixCounters.incrementGlobalConcurrentThreads();
  20.                       threadPool.markThreadExecution();
  21.                       endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
  22.                       executionResult = executionResult.setExecutedInThread();
  23.                       //执行hook,若异常,则直接抛出异常
  24.                       try {
  25.                           executionHook.onThreadStart(_cmd);
  26.                           executionHook.onRunStart(_cmd);
  27.                           executionHook.onExecutionStart(_cmd);
  28.                           return getUserExecutionObservable(_cmd);
  29.                       } catch (Throwable ex) {
  30.                           return Observable.error(ex);
  31.                       }
  32.                   } else {
  33.                       //空返回
  34.                       return Observable.empty();
  35.                   }
  36.               }
  37.           }).doOnTerminate(new Action0() {
  38.               @Override
  39.               public void call() {
  40.                   //结束逻辑,省略
  41.               }
  42.           }).doOnUnsubscribe(new Action0() {
  43.               @Override
  44.               public void call() {
  45.                   //取消订阅逻辑,省略
  46.               }
  47.               //从线程池中获取业务执行线程
  48.           }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
  49.               @Override
  50.               public Boolean call() {
  51.                   //判断是否超时
  52.                   return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
  53.               }
  54.           }));
  55.       } else {
  56.           //信号量模式
  57.           //省略
  58.       }
  59.   }
【HystrixThreadPool】Subscription schedule(final Action0 action):HystrixContextScheduler是Hystrix对rx中Scheduler调度器的重写,主要为了实现在Observable未被订阅时,不执行命令,以及支持在命令执行过程中能够打断运行。在rx中,Scheduler将生成对应的Worker给Observable用于执行命令,由Worker具体负责相关执行线程的调度,ThreadPoolWorker是Hystrix自行实现的Worker,执行调度的核心方法。

  1. public Subscription schedule(final Action0 action) {
  2.     //若无订阅,则不执行直接返回
  3.     if (subscription.isUnsubscribed()) {
  4.         return Subscriptions.unsubscribed();
  5.     }
  6.     ScheduledAction sa = new ScheduledAction(action);
  7.     subscription.add(sa);
  8.     sa.addParent(subscription);
  9.     //获取线程池
  10.     ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
  11.     //提交执行任务
  12.     FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
  13.     sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
  14.     return sa;
  15. }

五、超时检测



Hystrix超时机制降低了第三方依赖项延迟过高对调用方的影响,使请求快速失败。主要通过延迟任务机制实现,包括注册延时任务过程和执行延时任务过程。
当隔离策略为线程池时,主线程订阅执行结果,线程池中任务线程调用提供者服务端,同时会有定时器线程在一定时间后检测任务是否完成,若未完成则表示任务超时,抛出超时异常,并且后续任务线程的执行结果也会跳过不再发布;若已完成则表示任务在超时时间内完成执行完成,定时器检测任务结束。
当隔离策略为信号量时,主线程订阅执行结果并实际调用提供者服务端(没有任务线程),当超出指定时间,主线程仍然会执行完业务调用,然后抛出超时异常。信号量模式下超时配置有一定缺陷,不能取消在执行的调用,并不能限制主线程返回时间。
【AbstractCommand】ObservableexecuteCommandAndObserve(finalAbstractCommand _cmd):超时检测入口,执行lift(new HystrixObservableTimeoutOperator(_cmd))关联超时检测任务。

  1. private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
  2.     //省略
  3.     Observable<R> execution;
  4.     //判断是否开启超时检测
  5.     if (properties.executionTimeoutEnabled().get()) {
  6.         execution = executeCommandWithSpecifiedIsolation(_cmd)
  7.                 //增加超时检测操作
  8.                 .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
  9.     } else {
  10.         //正常执行
  11.         execution = executeCommandWithSpecifiedIsolation(_cmd);
  12.     }
  13.     return execution.doOnNext(markEmits)
  14.             .doOnCompleted(markOnCompleted)
  15.             .onErrorResumeNext(handleFallback)
  16.             .doOnEach(setRequestContext);
  17. }
【HystrixObservableTimeoutOperator】Subscriber call(final Subscriber child):创建检测任务,并关联延迟任务;若检测任务执行时仍未执行完成,则抛出超时异常;若已执行完成或异常,则清除检测任务。

  1. public Subscriber<? super R> call(final Subscriber<? super R> child) {
  2.         final CompositeSubscription s = new CompositeSubscription();
  3.         child.add(s);
  4.         final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
  5.         //实列化监听器
  6.         TimerListener listener = new TimerListener() {
  7.             @Override
  8.             public void tick() {
  9.                 //若任务未执行完成,则更新为超时
  10.                 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
  11.                     // 上报超时失败
  12.                     originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
  13.                     // 取消订阅
  14.                     s.unsubscribe();
  15.                     final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
  16.  
  17.                         @Override
  18.                         public void run() {
  19.                             child.onError(new HystrixTimeoutException());
  20.                         }
  21.                     });
  22.                     //抛出超时异常
  23.                     timeoutRunnable.run();
  24.                 }
  25.             }
  26.             //超时时间配置
  27.             @Override
  28.             public int getIntervalTimeInMilliseconds() {
  29.                 return originalCommand.properties.executionTimeoutInMilliseconds().get();
  30.             }
  31.         };
  32.         //注册监听器,关联检测任务
  33.         final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
  34.         originalCommand.timeoutTimer.set(tl);
  35.         Subscriber<R> parent = new Subscriber<R>() {
  36.             @Override
  37.             public void onCompleted() {
  38.                 if (isNotTimedOut()) {
  39.                     // 未超时情况下,任务执行完成,清除超时检测任务
  40.                     tl.clear();
  41.                     child.onCompleted();
  42.                 }
  43.             }
  44.             @Override
  45.             public void onError(Throwable e) {
  46.                 if (isNotTimedOut()) {
  47.                     // 未超时情况下,任务执行异常,清除超时检测任务
  48.                     tl.clear();
  49.                     child.onError(e);
  50.                 }
  51.             }
  52.             @Override
  53.             public void onNext(R v) {
  54.                     //未超时情况下,发布执行结果;超时时则直接跳过发布执行结果
  55.                 if (isNotTimedOut()) {
  56.                     child.onNext(v);
  57.                 }
  58.             }
  59.             //判断是否超时
  60.             private boolean isNotTimedOut() {
  61.                 return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
  62.                         originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
  63.             }
  64.         };
  65.         s.add(parent);
  66.         return parent;
  67.     }
  68. }
【HystrixTimer】ReferenceaddTimerListener(finalTimerListener listener):addTimerListener通过java的定时任务服务scheduleAtFixedRate在延迟超时时间后执行。

  1. public Reference<TimerListener> addTimerListener(final TimerListener listener) {
  2.     //初始化xian
  3.     startThreadIfNeeded();
  4.     //构造检测任务
  5.     Runnable r = new Runnable() {
  6.  
  7.         @Override
  8.         public void run() {
  9.             try {
  10.                 listener.tick();
  11.             } catch (Exception e) {
  12.                 logger.error("Failed while ticking TimerListener", e);
  13.             }
  14.         }
  15.     };
  16.     //延迟执行检测任务
  17.     ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
  18.     return new TimerReference(listener, f);
  19. }

六、降级

Hystrix降级逻辑作为兜底的策略,当出现业务执行异常、线程池或信号量已满、执行超时等情况时,会进入降级逻辑。降级逻辑中应从内存或静态逻辑获取通用返回,尽量不依赖依赖网络调用,如果未实现降级方法或降级方法中也出现异常,则业务线程中会引发异常。

【AbstractCommand】Observable getFallbackOrThrowException(finalAbstractCommand _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException):首先判断是否为不可恢复异常,若是则不走降级逻辑,直接异常返回;其次判断是否能获取到降级信号量,然后走降级逻辑;当降级逻辑中也发生异常或者没有降级方法实现时,则异常返回。

  1. private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
  2.     final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
  3.     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
  4.     executionResult = executionResult.addEvent((int) latency, eventType);
  5.     //判断是否为不可恢复异常,如栈溢出、OOM等
  6.     if (isUnrecoverable(originalException)) {
  7.         logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
  8.         Exception e = wrapWithOnErrorHook(failureType, originalException);
  9.         //直接返回异常
  10.         return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
  11.     } else {
  12.         //判断为是否可恢复错误
  13.         if (isRecoverableError(originalException)) {
  14.             logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
  15.         }
  16.         //判断降级配置是否打开
  17.         if (properties.fallbackEnabled().get()) {
  18.           /**
  19.             * 省略
  20.             */
  21.             final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
  22.                 @Override
  23.                 public Observable<R> call(Throwable t) {
  24.                     Exception e = wrapWithOnErrorHook(failureType, originalException);
  25.                     Exception fe = getExceptionFromThrowable(t);
  26.  
  27.                     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
  28.                     Exception toEmit;
  29.                     //是否是不支持操作异常,当业务中没有覆写getFallBack方法时,会抛出此异常
  30.                     if (fe instanceof UnsupportedOperationException) {
  31.                         logger.debug("No fallback for HystrixCommand. ", fe);
  32.                         eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
  33.                         executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
  34.                         toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
  35.                     } else {
  36.                         //执行降级逻辑时发生异常
  37.                         logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
  38.                         eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
  39.                         executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
  40.                         toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
  41.                     }
  42.                     //判断异常是否包装
  43.                     if (shouldNotBeWrapped(originalException)) {
  44.                         //抛出异常
  45.                         return Observable.error(e);
  46.                     }
  47.                     //抛出异常
  48.                     return Observable.error(toEmit);
  49.                 }
  50.             };
  51.             //获取降级信号量
  52.             final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
  53.             final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
  54.             final Action0 singleSemaphoreRelease = new Action0() {
  55.                 @Override
  56.                 public void call() {
  57.                     if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
  58.                         fallbackSemaphore.release();
  59.                     }
  60.                 }
  61.             };
  62.             Observable<R> fallbackExecutionChain;
  63.             // 尝试获取降级信号量
  64.             if (fallbackSemaphore.tryAcquire()) {
  65.                 try {
  66.                     //判断是否定义了fallback方法
  67.                     if (isFallbackUserDefined()) {
  68.                         executionHook.onFallbackStart(this);
  69.                         //执行降级逻辑
  70.                         fallbackExecutionChain = getFallbackObservable();
  71.                     } else {
  72.                         //执行降级逻辑
  73.                         fallbackExecutionChain = getFallbackObservable();
  74.                     }
  75.                 } catch (Throwable ex) {
  76.                     fallbackExecutionChain = Observable.error(ex);
  77.                 }
  78.                 return fallbackExecutionChain
  79.                         .doOnEach(setRequestContext)
  80.                         .lift(new FallbackHookApplication(_cmd))
  81.                         .lift(new DeprecatedOnFallbackHookApplication(_cmd))
  82.                         .doOnNext(markFallbackEmit)
  83.                         .doOnCompleted(markFallbackCompleted)
  84.                         .onErrorResumeNext(handleFallbackError)
  85.                         .doOnTerminate(singleSemaphoreRelease)
  86.                         .doOnUnsubscribe(singleSemaphoreRelease);
  87.             } else {
  88.                 //处理降级信号量拒绝异常
  89.                return handleFallbackRejectionByEmittingError();
  90.             }
  91.         } else {
  92.             //处理降级配置关闭时异常
  93.             return handleFallbackDisabledByEmittingError(originalException, failureType, message);
  94.         }
  95.     }
  96. }
【HystrixCommand】R getFallback():HystrixCommand默认抛出操作不支持异常,需要子类覆写getFalBack方法实现降级逻辑。

  1. protected R getFallback() {
  2.     throw new UnsupportedOperationException("No fallback available.");
  3. }

七、健康统计

Hystrix基于通过滑动窗口的数据统计判定服务失败占比选择性熔断,能够实现快速失败并走降级逻辑。步骤如下:
  • AbstractCommand执行完成后调? handleCommandEnd?法将执行结果HystrixCommandCompletion事件发布到事件流中;
  • 事件流通过 Observable.window()?法将事件按时间分组,并通过 flatMap()?法将事件按类型(成功、失败等)聚合成桶,形成桶流;
  • 再将各个桶使?Observable.window()按窗口内桶数量聚合成滑动窗?数据;
  • 将滑动窗口数据聚合成数据对象(如健康数据流、累计数据等);
  • 熔断器CircuitBreaker初始化时订阅健康数据流,根据健康情况修改熔断器的开关。

【AbstractCommand】void handleCommandEnd(boolean commandExecutionStarted):在业务执行完毕后,会调用handleCommandEnd方法,在此方法中,上报执行结果executionResult,这也是健康统计的入口。

  1. private void handleCommandEnd(boolean commandExecutionStarted) {
  2.     Reference<TimerListener> tl = timeoutTimer.get();
  3.     if (tl != null) {
  4.         tl.clear();
  5.     }
  6.  
  7.     long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
  8.     executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
  9.     //执行结果上报健康统计
  10.     if (executionResultAtTimeOfCancellation == null) {
  11.         metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
  12.     } else {
  13.         metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
  14.     }
  15.  
  16.     if (endCurrentThreadExecutingCommand != null) {
  17.         endCurrentThreadExecutingCommand.call();
  18.     }
  19. }
【BucketedRollingCounterStream】BucketedRollingCounterStream(HystrixEventStream stream, final int numBuckets, int bucketSizeInMs,final Func2 appendRawEventToBucket,final Func2 re-duceBucket)
健康统计类HealthCountsStream的滑动窗口实现主要是在父类BucketedRollingCounterStream,首先父类BucketedCounterStream将事件流处理成桶流,BucketedRollingCounterStream处理成滑动窗口,然后由HealthCountsStream传入的reduceBucket函数处理成健康统计信息



  1. protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
  2.                                        final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
  3.                                        final Func2<Output, Bucket, Output> reduceBucket) {
  4.     //调用父类,数据处理成桶流
  5.     super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  6.     //根据传入的reduceBucket函数,处理滑动窗口内数据
  7.     Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
  8.         @Override
  9.         public Observable<Output> call(Observable<Bucket> window) {
  10.             return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  11.         }
  12.     };
  13.     //对父类桶流数据进行操作
  14.     this.sourceStream = bucketedStream
  15.     //窗口内桶数量为numBuckets,每次移动1个桶
  16.             .window(numBuckets, 1)
  17.             //滑动窗口内数据处理
  18.             .flatMap(reduceWindowToSummary)
  19.             .doOnSubscribe(new Action0() {
  20.                 @Override
  21.                 public void call() {
  22.                     isSourceCurrentlySubscribed.set(true);
  23.                 }
  24.             })
  25.             .doOnUnsubscribe(new Action0() {
  26.                 @Override
  27.                 public void call() {
  28.                     isSourceCurrentlySubscribed.set(false);
  29.                 }
  30.             })
  31.             .share()
  32.             .onBackpressureDrop();
  33. }
【HealthCounts】HealthCounts plus(long[] eventTypeCounts):对桶内数据按事件类型累计,生成统计数据HealthCounts;

  1. public HealthCounts plus(long[] eventTypeCounts) {
  2.     long updatedTotalCount = totalCount;
  3.     long updatedErrorCount = errorCount;
  4.  
  5.     long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
  6.     long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
  7.     long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
  8.     long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
  9.     long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
  10.     //总数
  11.     updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
  12.     //失败数
  13.     updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
  14.     return new HealthCounts(updatedTotalCount, updatedErrorCount);
  15. }

八、总结

在分布式环境中,不可避免地会有许多服务的依赖项中有的失败。Hystrix作为一个库,可通过添加熔断、隔离、降级等逻辑来帮助用户控制分布式服务之间的交互,以提高系统的整体弹性。主要功能如下:
  • 保护系统,控制来自访问第三方依赖项(通常是通过网络)的延迟和失败
  • 阻止复杂分布式系统中的级联故障
  • 快速失败并快速恢复
  • 平滑降级
  • 近乎实时的监控,警报和控制
Hystrix使用过程中,有一些要注意的点:
  1. 覆写的getFallback()方法,尽量不要有网络依赖。如果有网络依赖,建议采用多次降级,即在getFallback()内实例化 HystrixCommand,并执行Command。getFallback()尽量保证高性能返回,快速降级。
  2. HystrixCommand 建议采用的是线程隔离策略。
  3. hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize设置为true时,hystrix.threadpool.default.maximumSize才会生效。最大线程数需要根据业务自身情况和性能测试结果来考量,尽量初始时设置小一些,支持动态调整大小,因为它是减少负载并防止资源在延迟发生时被阻塞的主要工具。
  4. 信号隔离策略下,执行业务逻辑时,使用的是应用服务的父级线程(如Tomcat容器线程)。所以,一定要设置好并发量,有网络开销的调用,不建议使用该策略,容易导致容器线程排队堵塞,从而影响整个应用服务。
  • 另外Hystrix高度依赖RxJava这个响应式函数编程框架,简单了解RxJava的使用方式,有利于理解源码逻辑。
阅读(845) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~