原理分析

通过前面的demo演示,我们对Hystrix的使用场景和使用方法已经有了一个初步的了解。接下来通过解读Netflix Hystrix官方提供的流程图来详细了解当一个请求调用了相关服务依赖之后Hystrix是如何工作的(即当你访问http://localhost:9000/ribbon-consumer请求之后,在RIBBON-CONSUMER中是如何处理的)。

工作流程

接下来根据官方提供的Netflix Hystrix流程图中的所标记的数字顺序来解释每一个环节的详细内容,如下图所示:

这一流程中所涉及的流程如下所示:
(1)创建HystrixCommand或HystrixObservableCommand对象;
(2)执行command命令;
(3)结果是否被缓存;
(4)断路器是否打开;
(5)线程池/请求队列/信号量是否占满;
(6)HystrixObservableCommand.construct() 或者HystrixCommand.run();
(7)计算断路器的健康度;
(8)fallback处理;
(9)返回成功的响应。
接下来将详细对以上流程进行分析,以加深对于Netflix Hystrix流程的印象。

(1)、创建HystrixCommand或HystrixObservableCommand对象。

首先,构建一个HystrixCommand或是HystrixObservableCommand对象,用来表示对依赖服务的操作请求,同时传递所有需要的参数。从它们的命名中就能知道它采用了“命令模式”来实现对服务调用操作的封装,这两个Command对象分别针对不同的应用场景,其中HystrixCommand用在依赖的服务返回单个操作结果的场景;而HystrixObservableCommand则用在依赖的服务返回多个操作结果的时候:

1
2
3
4
5
//依赖的服务返回单个操作结果的场景
HystrixCommand command = new HystrixCommand(arg1, arg2);

//依赖的服务返回多个操作结果的时候
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

命令模式,将来自客户端的请求封装为一个对象,从而让你可以使用不同的请求对客户端进行参数化。它可以被用于实现“行为请求者”与“行为实现者”的解耦,以便使两者可以适应变化。

举一个例子,来简单实现一下命令模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//接收者
public class Receiver{
public void action(){
//真正的业务逻辑实现
}
}

//抽象命令
interface Command{
void execute();
}

//具体命令实现
public class ConcreteCommand implements Command{
private Receiver receiver;
public ConcreteCommand(Receiver receiver){
this.receiver = receiver;
}
public void execute(){
this.receiver.action();
}
}

//客户端调用者
public class Invoker{
private Command command;
public void setCommand(Command command){
this.command = command;
}
public void action(){
this.command.execute();
}
}

public class Client{
public static void main(String [] args){
Receiver receiver = new Receiver();
Command command = new ConcreteCommand(receiver);
Invoker invoker = new Invoker();
invoker.setCommand(command);
invoker.action(); //客户端通过调用者来执行命令
}
}

解释一下上述代码的含义:(1)Receiver是接收者,它知道如何处理具体的业务逻辑;(2)Command是抽象命令,它定义了一个命令对象应具备的一系列命令操作,如execute()、undo()、redo()等。当命令操作被调用的时候就会触发接收者去做具体命令对应的业务逻辑;(3)ConcreteCommand是一个具体的命令,在这里它绑定了命令操作与接收者之间的关系,execute()命令的实现委托给了Receiver对象的action()方法;(4)Invoker是一个调用者,它持有一个命令对象,并且可以在需要的时候通过命令对象完成具体的业务逻辑。

从上面的例子可以看到,调用者Invoker与操作者(接收者Receiver)两者通过Command命令接口实现了解耦。

对于调用者来说,我们可以为其注入多个命令操作,比如新建文件、复制文件、删除文件等这三个操作,调用者只需要在需要对应操作的时候直接调用即可,而不需要知道这些操作命令实际是如何实现的。这里所提到的HystrixCommand和HystrixObservableCommand则是在Hystrix中对Command的进一步抽象定义,后续会逐步介绍它的内容来帮助理解其运作原理。

其实细心的你可能也已经发现,Invoker和Receiver的关系非常类似于“请求-响应”模式。所以它比较适用于实现记录日志、撤销操作、队列请求等。

接下来介绍以下4种比较适合使用命令模式的场景:
(1)使用命令模式作为“回调(CallBack)”在面向对象系统中的替代。“CallBack”其实就是先将一个函数登记上,然后在以后调用此函数。
(2)需要在不同的时间指定请求,将请求排队。一个命令对象和原来的请求发出者可以有不同的生命周期,也就是说原来的请求发出者可能已经不存在了,而命令对象本身仍然是活动的。这时命令的接收者可以是在本地,也可以在网络的另一个地址。命令对象可以在序列化之后传送到另外一台机器上去。

(3)系统需要支持命令的撤销。命令对象可以把状态存储起来,等到客户端需要撤销命令所产生的效果时,可以调用undo()方法,把命令所产生的效果撤销掉。命令对象还可以提供redo()方法,以供客户端在需要的时候重新实施命令效果。
(4)如果要将系统中所有的数据更新到日志里,以便在系统崩溃时,可以根据日志读回所有的数据更新命令,重新调用execute()方法一条一条的执行这些命令,从而恢复系统在崩溃前所做的数据更新。

(2)、执行command命令

从上面的流程图可以看到,一共存在4种命令的执行方式,但是Hystrix在执行时会根据创建的Command对象以及具体的情况来选择一个执行。需要说明的是HystrixCommand实现了前面的两个执行方式,即execute和queue方法:

execute方法是同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。(调用queue的get()方法来完成实现)

queue方法是异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。(需要执行的command会在线程池中进行排队)。

1
2
R value = command.execute();
Future<R> fValue = command.queue();

说完了HystrixCommand,接下来再来学习HystrixObservableCommand,它实现了后面的两个执行方式,即observe和toObservable方法。

observe()方法是异步执行,热观察,调用toObservable并内置ReplaySubject,根据它返回的可观察对象(Observable对象),简单处理并完成run方法的执行,它可以被立即执行,如果订阅了那么会重新通知。它代表了操作的多个结果,它是一个Hot Observable。

toObservable()方法是异步执行,冷观察,并不会马上执行Hystrix的run()方法。作为一个RxJava类,它返回的是一个最原始的可观察对象(Observable对象)。当我们订阅这个对象时,它才会把执行的信息传递给订阅者。它代表了操作的多个结果,但是它返回的是一个Cold Observable。

1
2
Observable<R> ohValue = command.observe();
Observable<R> ocValue = command.toObservable();

在Hystrix的底层实现中大量使用了RxJava,为了后续理解方便,这里对RxJava的“观察者-订阅者”模式做一个简单的介绍。

RxJava是JVM响应式扩展,是一个使用Java VM的可观察序列编写异步和基于事件的程序的库。

RxJava最核心的两个内容是Observable(“被观察者”或者“事件源”)和Subscriber(“观察者”或者“订阅者”)。

  • Observable用来向订阅者Subscriber对象发布事件,Subscriber对象则在接收到事件后对其进行处理,而在这里所指的事件通常就是对依赖服务的调用。
  • 一个Observable可以发出多个事件,直到结束或是发生异常。
  • Observable对象每发出一个事件,就会调用对应观察者Subscriber对象的onNext()方法。
  • 每一个Observable的执行,最后一定会通过调用Subscriber.onCompleted()或者Subscriber.onError()来结束该事件的操作流。

RxJava看起来很像设计模式中的观察者模式,但是有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。

为了更好地理解这个Observabler和Subscriber,接下来将通过一段伪代码来进行演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//演示Observable和Subscriber
public void testRxJava(){
//创建事件源Observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello,RxJava");
subscriber.onNext("Hello,World");
subscriber.onCompleted();
}
});
//创建订阅者Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onNext(String s) {
System.out.println("Subscriber: "+s);
}
};
//订阅
observable.subscribe(subscriber);
}

在上面的例子中,我们创建了一个简单的事件源observable和一个对事件传递内容输出的订阅者subscriber,两者通过observable.subscriber(subscriber)这一订阅方式来触发事件的发布。


还记得在前面我们介绍了两个不同的概念:Hot Observable和Cold Observable,其中前者是observe()方法的返回对象;而后者则是toObservable()方法的返回对象。

那么这两者有什么区别呢?其中Hot Observable,它不论“事件源”是否有“订阅者”,都会在创建后对事件进行发布,所以对于Hot Observable的每一个“订阅者”都有可能是从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程。

而Cold Observable在没有“订阅者”的时候并不会发布事件,而是进行等待,直到有“订阅者”之后才发布事件,所以对于Cold Observable的订阅者来说,它可以保证一开始看到整个操作的全部过程。


通过前面的介绍,你可能会认为只是在HystrixObservableCommand中使用了RxJava,但是实际上execute()、queue()方法也都使用了RxJava来实现。查看一下HystrixCommand类中的execute和queue方法的源码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public R execute() {
try {
return this.queue().get();
} catch (Exception var2) {
throw Exceptions.sneakyThrow(this.decomposeException(var2));
}
}

public Future<R> queue() {
final Future<R> delegate = this.toObservable().toBlocking().toFuture();
Future<R> f = new Future<R>() {
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
} else {
if ((Boolean)HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
HystrixCommand.this.interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}

boolean res = delegate.cancel(HystrixCommand.this.interruptOnFutureCancel.get());
if (!HystrixCommand.this.isExecutionComplete() && HystrixCommand.this.interruptOnFutureCancel.get()) {
Thread t = (Thread)HystrixCommand.this.executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}

return res;
}
}

public boolean isCancelled() {
return delegate.isCancelled();
}

public boolean isDone() {
return delegate.isDone();
}

public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}

public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception var6) {
Throwable t = this.decomposeException(var6);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException)t;
switch(hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
return f;
default:
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
} else {
return f;
}
}

解释一下上述代码的含义:

  • execute()是通过queue()方法返回的异步对象Future的get()方法来实现同步执行的,该方法会等待任务执行结束,然后得到一个R类型的结果并进行返回。
  • queue()方法则是通过toObservable()来获得Cold Observable,并且通过toBlocking()将该Observable转换为一个BlockingObservable对象,它可以将数据以阻塞的方式发射出来。之后调用该对象的toFuture()方法将BlockingObservable转换为一个Future,注意该方法只是创建一个Future返回,并不会阻塞,这使得消费者可以自己决定如何处理异步操作。而execute()方法就是直接使用了queue()返回的Future中的get方法(这是一个阻塞方法)来实现同步操作的。同时通过这种方式转换的Future要求Observable只发射一个数据,所以这两个实现都只能返回单一结果。
(3)、结果是否被缓存

若当前命令的请求缓存功能是被启用的,并且该命令缓存命令,那么缓存的结果会立即以Observable对象的形式进行返回。

(4)、断路器是否打开

在命令结果没有缓存命中的时候,Hystrix在执行命令前需要检查断路器是否为打开状态:

  • 如果断路器是打开的,那么Hystrix不会执行命令,而是转接到fallback处理逻辑(对应于后面的第八步)。
  • 如果断路器是关闭的,那么Hystrix会跳到第五步,检查是否有可用资源来执行命令。关于断路器的具体实现细节,后续会做更加详细的介绍和分析。
    (5)、线程池/请求队列/信号量是否占满
    如果与命令相关的线程池和请求队列,或者信号量(不使用线程池的时候)已经被占满,那么Hystrix也不会执行命令,而是转接到fallback处理逻辑(对应于后面的第八步)。

需要注意的是,这里的Hystrix所判断的线程池并非容器的线程池,而是每个依赖服务的专用线程池。Hystrix为了保证不会因为某个依赖服务的问题影响到其他依赖服务,而采用了“舱壁模式(Bulkhead Pattern)”来隔离每个依赖的服务。关于依赖服务的隔离与线程池相关的内容会在后续进行介绍。

(6)、HystrixObservableCommand.construct() 或者HystrixCommand.run()

Hystrix会根据我们编写的方法来决定采取什么样的方式去请求依赖服务:

  • HystrixCommand.run():该方法用于返回一个单一的结果,或者抛出异常。
  • HystrixObservableCommand.construct():该方法用于返回一个Observable对象来发射多个结果,或通过onError方法来发送错误通知。

如果run()或者construct()方法的执行时间超过了命令设置的超时阈值,当前处理线程将会抛出一个TimeoutException(如果该命令不在其自身的线程中执行,则会通过单独的计时线程来抛出)。在这种情况下,Hystrix会转接到fallback处理逻辑(对应于后面的第八步)。同时,如果当前命令没有被取消或中断,那么它最终会忽略run()或者construct()方法的返回。

如果命令没有抛出异常并返回了结果,那么Hystrix在记录一些日志并采集监控报告之后将该结果返回。在使用run()方法的情况下,Hystrix会返回一个Observable,它发射单个结果并产生onCompleted的结束通知;而在使用construct()方法的情况下,Hystrix会直接返回该方法产生的Observable对象。

(7)、计算断路器的健康度

Hystrix会将“成功”、“失败”、“拒绝”、“超时”等信息报告给断路器,而断路器会维护一组计数器来统计这些数据。

断路器会使用这些统计数据来决定是否要将断路器打开,来对某个依赖服务的请求进行“熔断/短路”,直到恢复期结束。若在恢复期结束后,根据统计数据判断如果还是未达到健康指标,就再次“熔断/短路”。

(8)、fallback处理

当命令执行失败的时候,Hystrix会进入fallback尝试回退处理,我们通常也称该操作为“服务降级”,能够引起服务降级处理的情况有下面几种:

  • 第4步,当前命令处于“熔断/短路”状态,断路器是打开的时候。
  • 第5步,当前命令的线程池、请求队列或者信号量被占满的时候。
  • 第6步,HystrixObservableCommand.construct()方法或者HystrixCommand.run()方法抛出异常的时候。

在服务降级逻辑中,我们需要实现一个通用的响应结果,并且该结果的处理逻辑应当是从缓存或是根据一些静态逻辑来获取,而不是依赖网络请求获取。如果一定要在降级逻辑中包含网络请求,那么该请求也必须被包装在HystrixCommand或者HystrixObservableCommand中,从而形成级联的降级策略,而最终的降级逻辑一定不是一个依赖网络请求的处理,而是一个能够稳定地返回结果的处理逻辑。

在HystrixCommand和HystrixObservableCommand中实现降级逻辑时还存在一些不同:

  • 当使用HystrixCommand的时候,通过实现HystrixCommand.getFallback()来实现服务降级逻辑。
  • 当使用HystrixObservableCommand的时候,通过HystrixObservableCommand.resumeWithFallback()实现服务降级逻辑,该方法会返回一个Observable对象来发射一个或多个降级结果。

当命令的降级逻辑返回结果之后,Hystrix就将该结果返回给调用者。当使用HystrixCommand.getFallback()的时候,它会返回一个Observable对象,该对象会发射getFallback()方法的处理结果。而使用HystrixObservableCommand.resumeWithFallback()实现的时候,它会将Observable对象直接返回。

如果我们没有为命令实现降级逻辑,或者降级处理逻辑中抛出了异常,Hystrix依然会返回一个Observable对象,但是它不会发射任何结果数据,而是通过onError方法通知命令立即中断请求,并通过onError()方法将引起命令失败的异常对象发送给调用者。实现一个有可能失败的降级逻辑是一种非常糟糕的做法,我们应该在实现降级策略时尽可能避免失败的情况。

当然完全不可能出现失败的完美策略是不存在的,如果降级执行发现失败的时候,Hystrix会根据不同的执行方法做出不同的处理:

  • execute()方法,会抛出异常;
  • queue()方法,会正常返回Future对象,但是当调用get()方法来获取结果的时候就会抛出异常。
  • observe()方法,会正常返回Observable对象,当订阅它的时候,将立即通过调用订阅者的onError()方法来通知中止请求。
  • toObservable()方法,会正常返回Observable对象,当订阅它的时候,将通过调用订阅者的onError()方法来通知中止请求。
(9)、返回成功的响应

当Hystrix命令执行成功之后,它会将处理结果直接返回,或是以Observable的形式返回。而具体以哪种方式返回,取决于之前第2步中我们所提到的对命令的4种不同执行方式:

还是之前的那张图,翻译一下右侧的一段英文:对于Hystrix命令的响应虽然总是以Observable的形式来返回,但是它可以被转换成你需要的使用方式来进行命令调用。

我们可以将此处图的信息和第2步中的源码分析联系起来,并且从源头toObservable()方法来进行分析:

  • toObservable():返回最原始的Observable,必须通过订阅它才会真正触发命令的执行流程。
  • observe():在toObservable()产生原始Observable之后立即订阅它,让命令能够马上开始异步执行,并返回一个Observable对象,当调用它的subscribe时,将重新产生结果和通知给订阅者。
  • queue():将toObservable()产生的原始Observable通过toBlocking()方法转换成BlockingObservable对象,并调用它的toFuture()方法返回异步的Future对象。
  • execute():在queue()产生异步结果Future对象之后,通过调用get()方法阻塞并等待结果的返回。