In RxJava, failures are just another type of notification. Every
Observable<T> is a sequence of events of type
T optionally followed by completion or error notification. This means that errors are implicitly a part of every stream, and even though we are not required to handle them, there are plenty of operators that declaratively handle errors in a more elegant way. Also, an obtrusive
Observable will not capture any errors, they are only propagated through the aforementioned error notifications.
subscribe() only listening for values and not errors is often a bad sign and possibly missed errors. Even if you do not expect any exceptions to happen (which is rarely the case), at least place error logging that plugs into your logging framework:
It is a good practice to surround a lambda expression within
create() with a
catch() block, just like in the previous example:
However, if you forget about the
catch and let
create() throw an exception, RxJava does its best and propagates such an exception as an
The two preceding code examples are semantically equivalent. Exceptions thrown from
create() are caught internally by RxJava and translated to error notification. Yet, it is advised to explicitly propagate exceptions via
subscriber.onError() if possible. Even better, use
All lambda expressions passed to higher-order functions like
filter() should be pure, whereas throwing an exception is an impure side effect. RxJava again does its best to handle unexpected exceptions here and the behavior is exactly what you would expect. If any operator in the pipeline throws an exception, it is translated to error notification and passed downstream.
Despite RxJava making an effort to fix broken user code, if you suspect your lambda expression to potentially throw an exception, make it explicit by using
flatMap() is a very versatile operator, it does not need to manifest the next step of asynchronous computation.
Observable is a container for values or errors, so if you want to declaratively express even very fast computation that can result in an error, wrapping it with
Observable is a good choice, as well.
This is fine: ordinary operators transform values flowing through but skip completion and error notifications, letting them flow downstream. This means that a single error from any upstream
Observable will propagate with a cascading failure to all downstream subscribers.
Again, this is fine if your business logic requires absolutely all steps to succeed. But sometimes you can safely ignore failures and replace them with fallback values or secondary sources.
The simplest error handling operator in RxJava is
onErrorReturn(): when encountered, an error simply replaces it with a fixed value:
onErrorReturn() is a fluent and very pleasant to read alternative to a
catch block that returns fixed result in the
catch statement known from imperative style:
onErrorResumeNext() operator basically replaces error notification with another stream. If you subscribe to an an
Observable guarded with
onErrorResumeNext() in case of failure, RxJava transparently switches from main
Observable to the fallback one, specified as an argument.
Theoretically we can return a different fallback stream based on the exception message or type. The
onErrorResumeNext() operator has an overloaded version that allows just that:
timeout() operator that listens to the upstream
Observable, constantly monitoring how much time elapsed since the last event or subscription. If it so happens that the silence between consecutive events is longer than a given period, the
timeout() operator publishes an error notification that contains
Now, let’s test drive the
timeout() operator in its simplest overloaded version:
The overloaded version of
timeout() does just that: it accept two factories of
Observables, one marking the timeout of the first event, and the second one for each subsequent element. An example is worth a thousand words:
Here, the first
Observable emits exactly one event after one second—this is the acceptable latency threshold for the first event. The second
Observable is created for each event that appears on the stream and allows fine tuning of the timeout for the subsequent event.
It is sometimes useful to also track the latency of each event, even if we do not timeout. The handy
timeInterval() operator does just that: it replaces each event of type
TimeInterval<T> that encapsulates the event but also shows how much time has elapsed since the previous event (or subscription in case of first event):
getValue() that returns
TimeInterval<LocalDate> also has
getIntervalInMilliseconds() but it is easier to see how it looks studying the output of the preceding program after subscription. You can clearly see that it took 533 milliseconds for the first event to arrive but only around 50 milliseconds for each one subsequently:
timeout() operator has yet another overloaded version that accepts the fallback
Observable replacing the original source in case of error. It is very similar in behavior to
The simplest version of the
retry() operator resubscribes to a failed
Onservable hoping that it will keep producing normal events rather than failures. For educational purposes, we will create an
Observable that misbehaves severely:
In 90 percent of the cases, subscribing to
risky() ends with a
RuntimeException. If you somehow make it to the
"OK" branch an artificial delay between zero and two seconds is injected. Such a risky operation will serve as a demonstration of
The behavior of
retry() is fairly straightforward: it pushes all events and completion notification downstream, but not
onError(). The error notification is swallowed (so no exception is logged whatsoever), thus we use
doOnError() callback. Every time
retry() encounters a simulated
TimeoutException, it tries subscribing again.
A word of caution here: if your
Observable is cached or otherwise guaranteed to always return the same sequence of elements,
retry() will not work:
risky() emits errors once, it will continue emitting them forever, no matter how many times you resubscribe. To overcome this issue, you can delay the creation of
Observable even further by using )
Even if an
Observable returned from
risky() is cached,
risky() multiple times, possibly getting a new
Observable each time.
retry() is a
while loop with a
try block within it, followed by an empty
First, we should limit the number of attempts, which happens to be built in:
The integer parameter to
retry() instructs how many times to resubscribe, thus
retry(0) is equivalent to no retry at all. If the upstream
Observable failed for the tenth time, the last seen exception is propagated downstream.
A more flexible version of
retry() leaves you with a decision about retry, based on the attempt number and the actual exception:
This version not only limits the number of resubscription attempts to 10, but also drops retrying prematurely if the exception happens to be
If failures are transient, waiting a little bit prior to a resubscription attempt sounds like a good idea. The
retry() operator does not provide such a possibility out of the box, but it is relatively easy to implement. A more robust version of
retryWhen() takes a function receiving an
Observable of failures. Every time an upstream fails, this
Observable emits a
Throwable. Our responsibility is to transform this
Observable in such a way that it emits some arbitrary event when we want to retry (hence the name):
The preceding example of
retryWhen() receives an
Observable that emits a
Throwable every time the upstream fails. We simply delay that event by one second so that it appears in the resulting stream one second later. This is a signal to
retryWhen() that it should attempt retry. If we simply returned the same stream (
retryWhen(x -> x)),
retryWhen() would behave exactly like
retry(), resubscribing immediately when an error occurs. With
retryWhen(), we can also easily simulate
retry(10) (well, almost… keep reading):
We receive an event each time a failure occurs. The stream we return is supposed to emit an arbitrary event when we want to retry. Thus, we simply forward the first 10 failures, causing each one of them to be retried immediately.
But what happens when eleventh failure occurs in a
Observable? This is where it becomes tricky. The
take(10) operator emits an
onComplete event immediately following the 10th failure. Therefore, after the 10th retry,
retryWhen() receives a completion event. This completion event is interpreted as a signal to stop retrying and complete downstream. It means that after 10 failed attempts, we simply emit nothing and complete. However, if we complete
Observable returned inside
retryWhen() with an error, this error will be propagated downstream.
In other words, as long as we emit any event from an
retryWhen(), they are interpreted as retry requests. However, if we send a completion or error notification, retry is abandoned and this completion or error is passed downstream. Doing just
failures.take(10) will retry 10 times, but in case of yet another failure, we do not propagate the last error but the successful completion, instead. Let’s have a look at it:
This looks quite complex, but it is also really powerful. We
zip failures with sequence numbers from 1 to 11. We would like to perform as many as 10 retry attempts, so if the attempt sequence number is smaller than 11, we return
timer(1, SECONDS). The
retryWhen() operator will capture this event and retry one second after failure. However, when the 10th retry ends with a failure, we return an
Observable with that error, completing the retry mechanism with the last seen exception.
This gives us a lot of flexibility. We can stop retrying when a certain exception appears or when too many attempts were already performed. Moreover, we can adjust the delay time between attempts! For example, the first retry can appear immediately but the delays between subsequent retries should grow exponentially:
Gives you few tips on how to make monitoring and debugging easier in applications using RxJava.
Observable has a set of callback methods that you can use to peek into various events, namely:*
What they all have in common is that they are not allowed to alter the state of an
Observable in any way and they all return the same
Observable, which makes them an ideal place to sprinkle some logging logic.
zipWith() actually subscribes to all of the underlying streams, effectively subscribing to the same
Observable twice. This is a problem that you can discover by observing
doOnSubscribe() is being invoked twice.
zip(), thanks to backpressure it no longer buffers faster stream infinitely, waiting for a slower one to emit events.
Instead, it asks for a fixed batch of values from each
MissingBackpressureException if it received more:
Requested 128, the value chosen by
Even when the source is infinite or very large, we should see at most 128 messages such as
Got: ... afterward from a well-behaving
You CANNOT use
doOnError() for any error handling; it is for logging only. It does not consume the error notification, which keeps propagating downstream:
As clean as
onErrorReturn() looks, it is very easy to swallow exceptions with it. It does provide the exception that we want to replace with a fallback value, but logging it is our responsibility. To keep functions small and composable, logging the error first in
doOnError() and then handling the exception in the following line silently is a little bit more robust. Forgetting to log the exception is rarely a good idea and must be a careful decision, not an oversight.
doOnEach(): This is invoked for each
onError(). It can accept either a lambda invoked for each
Notofication or an
doOnTerminate(): This is invoked when either
onError() occurs. It is impossible to distinguish between them, so it might be better to use
From RxJava中的错误处理 - 简书