When working with distributed systems it’s important to take failure into account. The book Release It! is a great book for learning how to deal with some of these issues. It presents for example the notion of circuit breakers that enable systems to detect failures and encapsulate logic that prevents a failure to reoccur constantly. This also helps in avoiding cascading failures throughout the system. Netflix have created an open source library called Hystrix that implements some of the patterns described in this book in Java. According to its website Hystrix "is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable". RxJava is another open source library from Netflix that brings reactive extensions to the JVM. This makes it easy to compose asynchronous and event-based programs in a functional manner. A great feature of Hystrix is that it contains a Hystrix Dashboard project to monitor the status of the circuit breakers (such as requests per second and if the circuit breaker is opened or closed) in essentially real-time by connecting to a Hystrix Event Stream. This is all great but another thing you typically want to do is to notify your existing monitoring infrastructure when a circuit breakers opens and closes. Next I will present a simple example of how to combine RxJava to monitor the Hystrix Stream and trigger a notification when a circuit breaker changes state. Some basic knowledge of Hystrix and RxJava is expected.
Single Hystrix Node Example
This simple example will assume that a Hystrix Event Stream is setup on one node using Hystrix. For simplicity’s sake we will also assume that there’s only one circuit breaker in the system. The Hystrix Event Stream publishes (as its name implies) a continuous stream of server sent events in the following format:
data: {
"type": "HystrixCommand",
"name": "PlaylistGet",
"group": "PlaylistGet",
"currentTime": 1355239617628,
"isCircuitBreakerOpen": false,
"errorPercentage": 0,
"errorCount": 0,
"requestCount": 121,
"rollingCountCollapsedRequests": 0,
"rollingCountExceptionsThrown": 0,
"rollingCountFailure": 0,
"rollingCountFallbackFailure": 0,
"rollingCountFallbackRejection": 0,
"rollingCountFallbackSuccess": 0,
"rollingCountResponsesFromCache": 69,
"rollingCountSemaphoreRejected": 0,
"rollingCountShortCircuited": 0,
"rollingCountSuccess": 121,
"rollingCountThreadPoolRejected": 0,
"rollingCountTimeout": 0,
"currentConcurrentExecutionCount": 0,
"latencyExecute_mean": 13,
"latencyExecute": {
"0": 3,
"25": 6,
"50": 8,
"75": 14,
"90": 26,
"95": 37,
"99": 75,
"99.5": 92,
"100": 252
},
"latencyTotal_mean": 15,
"latencyTotal": {
"0": 3,
"25": 7,
"50": 10,
"75": 18,
"90": 32,
"95": 43,
"99": 88,
"99.5": 160,
"100": 253
},
"propertyValue_circuitBreakerRequestVolumeThreshold": 20,
"propertyValue_circuitBreakerSleepWindowInMilliseconds": 5000,
"propertyValue_circuitBreakerErrorThresholdPercentage": 50,
"propertyValue_circuitBreakerForceOpen": false,
"propertyValue_circuitBreakerForceClosed": false,
"propertyValue_circuitBreakerEnabled": true,
"propertyValue_executionIsolationStrategy": "THREAD",
"propertyValue_executionIsolationThreadTimeoutInMilliseconds": 800,
"propertyValue_executionIsolationThreadInterruptOnTimeout": true,
"propertyValue_executionIsolationThreadPoolKeyOverride": null,
"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests": 20,
"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests": 10,
"propertyValue_metricsRollingStatisticalWindowInMilliseconds": 10000,
"propertyValue_requestCacheEnabled": true,
"propertyValue_requestLogEnabled": true,
"reportingHosts": 1
}
What we are interested in in this example is whether or not “isCircuitBreakerOpen” changes from false (closed) to true (opened) and vice versa. When this happens we want to notify a monitoring service of this change.
Since Hystrix is pushing out server sent events over HTTP(s) we use the RxJava Async HTTP Client project to turn this stream into an RxJava observable. Assuming that the Hystrix stream is located at http://myservice:8080/hystrix.stream
we can do like this:
Observable observable = ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable();
The ObservableHttp
class is provided by the RxJava Async HTTP Client library and as you can see we now have an observable that we can start working with. Adding a little piece of code converts the Observable<ObservableHttpResponse>
into an observable of strings representing the HTTP response content (in this case the something similar to the Hystrix event presented above):
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new));
Since Hystrix posts several different types of events and we’re only interested in the data events we need to filter out these kinds of events:
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith("data:"));
Next we make sure that the Hystrix event contains the string “isCircuitBreakerOpen” and if it does we remove “data:” from the event to get the event as pure JSON:
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith("data:")).
filter(data -> data.contains("isCircuitBreakerOpen")).
map(data -> data.substring("data:".length()));
Once we have the JSON data we want to see if the circuit breaker is opened by extracting the value of the “isCircuitBreakerOpen” attribute. In this case we use a library called JsonPath from the REST Assured project but any JSON parsing framework will do.
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith("data:")).
filter(data -> data.contains("isCircuitBreakerOpen")).
map(data -> data.substring("data:".length())).
map(data -> JsonPath.from(data).getBoolean("isCircuitBreakerOpen"));
In our example we only want to trigger a notification when the state changes from opened to closed or vice versa so we return a Pair (essentially a tuple) that contains two values, if the circuit breaker is open according the the Hystrix stream and if it’s open in the “monitoring system” (be aware that you probably don’t want read from the monitoring system like this in a real application though):
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith("data:")).
filter(data -> data.contains("isCircuitBreakerOpen")).
map(data -> data.substring("data:".length())).
map(data -> JsonPath.from(data).getBoolean("isCircuitBreakerOpen")).
map(isCircuitBreakerCurrentlyOpened -> Pair.of(isCircuitBreakerCurrentlyOpened, monitoringSystem.isCircuitBreakerOpened()));
We also make sure that the state of the circuit breaker has actually changed and if so we return only the value provided by the Hystrix stream (i.e. what was specified as the left value in the Pair object):
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith("data:")).
filter(data -> data.contains("isCircuitBreakerOpen")).
map(data -> data.substring("data:".length())).
map(data -> JsonPath.from(data).getBoolean("isCircuitBreakerOpen")).
map(isCircuitBreakerCurrentlyOpened -> Pair.of(isCircuitBreakerCurrentlyOpened, monitoringSystem.isCircuitBreakerOpened())).
filter(pair -> pair.getLeft() != pair.getRight()).
map(Pair::getLeft);
Next we just report the new status to the monitoring system and subscribe to the stream:
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith("data:")).
filter(data -> data.contains("isCircuitBreakerOpen")).
map(data -> data.substring("data:".length())).
map(data -> JsonPath.from(data).getBoolean("isCircuitBreakerOpen")).
map(isCircuitBreakerCurrentlyOpened -> Pair.of(isCircuitBreakerCurrentlyOpened, monitoringSystem.isCircuitBreakerOpened())).
filter(pair -> pair.getLeft() != pair.getRight()).
map(Pair::getLeft).
doOnNext(isCircuitBreakerOpened -> {
if (isCircuitBreakerOpened) {
monitoringSystem.reportCircuitBreakerOpened();
} else {
monitoringSystem.reportCircuitBreakerClosed();
}
}).
subscribe();
That’s it! For a working example refer to this github project.
Example of Hystrix Cluster Monitoring using Turbine
To monitor a whole cluster of different Hystrix nodes at the same time you can aggregate the streams using Turbine. Turbine is described as a "low latency high throughput aggregator for real time streams". This require us to make some slight modifications to our first example because the circuit breakers can be both closed and open at the same time in the same cluster. So instead of “isCircuitBreakerOpen” returning only true
or false
it can now return something like this:
data: {
..
"isCircuitBreakerOpen": {"true":1,"false":1}
...
In this example one node reports that its circuit breaker is open and another that it’s closed. However if all nodes report that they’re open Turbine would still send us
data: {
..
"isCircuitBreakerOpen": true
...
which means that we have to deal with both scenarios. To take this into account we need to modify the JSON parsing part of the previous example a little bit. In this simple example we imagine that we don’t care how many nodes that are opened or closed for a specific Hystrix group. This means that we can treat the entire cluster as “open” if one or more nodes report that they’re open. Thus we change
JsonPath.from(data).getBoolean("isCircuitBreakerOpen")
to
JsonPath.from(data).getBoolean("any { attr -> attr.key == 'isCircuitBreakerOpen' && attr.value }")
What happens here is that we make use of a GPath expression that checks if the “isCircuitBreakerOpen” attribute is found in the JSON document and if so its value must be equal to true. A trick here is that attr.value
returns true
(obviously) for the case when “isCircuitBreakerOpen” is true
and false
when “isCircuitBreakerOpen” is false
. When “isCircuitBreakerOpen” is a JSON Object (in the case when it’s equal to {"true":1,"false":1}
) then attr.value
will also return true
. So this is sufficient for this example. The full code is shown here:
ObservableHttp.createGet("http://myservice:8080/hystrix.stream", client).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith("data:")).
filter(data -> data.contains("isCircuitBreakerOpen")).
map(data -> data.substring("data:".length())).
map(data -> JsonPath.from(data).getBoolean("any { attr -> attr.key == 'isCircuitBreakerOpen' && attr.value }")).
map(isCircuitBreakerCurrentlyOpened -> Pair.of(isCircuitBreakerCurrentlyOpened, monitoringSystem.isCircuitBreakerOpened())).
filter(pair -> pair.getLeft() != pair.getRight()).
map(Pair::getLeft).
doOnNext(isCircuitBreakerOpened -> {
if (isCircuitBreakerOpened) {
monitoringSystem.reportCircuitBreakerOpened();
} else {
monitoringSystem.reportCircuitBreakerClosed();
}
}).
subscribe();
Conclusion
As you’ve hopefully seen it’s quite easy to react to changes in the Hystrix stream using RxJava. The examples presented here are of course quite trivial but can be extended to more advanced use cases. Refer to the github project for an example.
Hi Johan,
Great article, but I’m running into a problem.
I’ve cloned your example and in this section of code:
ObservableHttp.createGet(URL, httpAsyncClient).toObservable().
flatMap(response -> response.getContent().map(String::new)).
filter(hystrixEvent -> hystrixEvent.startsWith(“data:”)).
filter(data -> data.contains(“isCircuitBreakerOpen”)).
map(data -> data.substring(“data:”.length())).
map(data -> JsonPath.from(data).getBoolean(“isCircuitBreakerOpen”)).
I’m getting the error:
“Type mismatch: cannot convert from Observable to ”
Any idea what could be wrong? I’m using JDK1.8.0_11 and am using your build.gradle
That error should read “Type mismatch: cannot convert from Observable <Object> to <Unknown>”
Hmm not sure why. The example is pretty old by now, it’s quite possible that you need to do some changes if you’re using a newer version of Hystrix or RxJava.
It looks like my IDE does not like the lambda operations. I had to blow them up into methods. I’m using Spring Tool Suite 3.7.0, built on Eclipse Mars. As far as I know it actually does support the lambda operator but for some reason it just hates this code.
Hi, I want to add few more parameter in hystrix stream, Is there any way to add few parameter in hystrix json object.