Designing an API with Rx in mind doesn’t influence the entire architecture, because we can always fall back to
BlockingObservable and Java collections. But it’s better to have wide range of possibilities that we can further trim down if necessary.
The Observable represents the stream of data and can be sub-scribed to by an Observer:
Upon subscription, the
Observer can have three types of events pushed to it:
- Data via the
- Errors (exceptions or throwables) via the
- Stream completion via the
onNext() method might never be called or might be called once, many, or infinite times. The
onCompleted() are terminal events, meaning that only one of them can be called and only once. When a terminal event is called, the
Observable stream is finished and no further events can be sent over it. Terminal events might never occur if the stream is infinite and does not fail.
As will be shown in Flow Control and Backpressure, there is an additional type of signature to permit interactive pull:
This is used with a more advanced
Subscriber (with more details given in Controlling Listeners by Using Subscription and Subscriber
unsubcribe function as part of the
Subscription interface is used to allow a subscriber to unsubscribe from an
Observable stream. The
setProducer function and
Producer types are used to form a bidirectional communication channel between the producer and consumer used for flow control.
rx.Observable<T> represent a flowing sequence of values. It is the abstraction that you will use all of the time. Because these values often appear over a wide time range, we tend to think about an
Observable as a stream of events.
Observable<T> can actually produce three types of events:
- Values of type
T, as declared by
- Completion event
- Error event
The specification of reactive extensions clearly states that every
Observable can emit an arbitrary number of values optionally followed by completion or error (but not both). Strictly speaking Rx Design Guidelines define this rule as follows:
OnNext* (OnCompleted | OnError)?—where
OnNext represents a new event.
Observable is entirely lazy and never begins to emit events until
someone is actually interested.
Observable might already be emitting events no matter how many
Subscribers they have.
Observable pushes events downstream, even if no one listens and events are possibly missed. Examples of such
Observables include mouse movements, keyboard inputs, or button clicks.
How to ensure that every subscriber received all events:
One such technique already sneaked into this chapter: the
cache() operator. Technically, it can buffer all events from a hot
Observable and allow subsequent subscribers to receive the same sequence of events. However, because it consumes theoretically an unlimited amount of memory, be careful with caching hot
Unless you work with an external API that already exposes
Observables, you first must learn where
Observables come from and how you can create a stream and handle subscriptions.
First, there are several factory methods that create fixed constant
Observables. These are useful if you want to use RxJava consistently across an entire codebase or when values to be emitted are cheap to produce and known in advance:
Observableinstance that emits exactly one
valueto all future subscribers and completes afterward. Overloaded versions of the
just()operator can take anything from two to nine values to be emitted.
T, thus creating
Observable<T>with as many values emitted as elements in
valuescollection. Another overloaded version accepts a
Future<T>, emitting an event when the underlying
ninteger numbers starting from
from. For example,
range(5, 3)will emit
7and then complete normally. Each subscriber will receive the same set of numbers.
Completes immediately after subscription, without emitting any values.
Observablenever emits any notifications, neither values nor completion or error. This stream is useful for testing purposes.
onError()notification immediately to every subscriber. No other values are emitted and according to contract
onCompleted()cannot occur as well.
error() factories don’t seem terribly useful; however, they are quite handy when composing with genuine
All the factory methods by default operate on the client thread. Have a look at the following code sample:
What we are interested in is the thread that executed each log statement:
Unfortunately, for better or worse,
null is a valid event value in RxJava; that is,
Observable.just("A", null, "B") is as good as any other stream. You need to take that into account when designing custom operators as well as when applying operators. However, passing
null is generally considered nonidiomatic, and you should use wrapper value types, instead.
Observable.create() is so versatile that in fact you can mimic all
of the previously discovered factory methods on top of it. For example,
Observable.just(x), emits a single value
x and immediately
completes afterward, might look like this:
As an exercise, try to implement
empty(), or even
range() by using only
We will use the open source Twitter4J library that can push a subset of new tweets using a callback-based API:
twitterStream.sample() starts a background thread that logs in to Twitter and awaits new messages. Every time a tweet appears, the
onStatus callback is executed.Execution can jump between threads, therefore we can no longer rely on throwing exceptions. Instead the
onException() notification is used.
Use it as:
What if we want to count the number of tweets per second? Or consume just the first five? And what if we would like to have multiple listeners? In these situations, each of these situations opens a new HTTP connection. Last but not least, this API does not allow unsubscribing when we are done, risking resource leak.
When someone subscribes only to receive a small fraction of the stream, our
Observable will make sure to clean up the resources.
We know a second technique to implement clean-up that does not require waiting for an upstream event. The moment a subscriber unsubscribes, we call
shutdown() immediately, rather than waiting for the next tweet to come just to trigger clean-up behavior (last line):
The subscription is very similar:
However, keep in mind that the implementation still opens a new network connection for each
Observable blurs the difference between hot and cold streams. On one hand, it represents external events that appear without our control (hot behavior). On the other hand, events will not begin flowing (no underlying HTTP connection) to our system until we actually
Manually keeping track of all subscribers and shutting down the connection to the external system only when all subscribers leave is a Sisyphean task that we will implement anyway, just to appreciate idiomatic solutions later on. The idea is to keep track of all subscribers in some sort of
Set<Subscriber<Status>> and start/shut down the external system connection when it becomes empty/nonempty:
subscribers set thread-safely stores a collection of currently subscribed
Observers. Every time a new
Subscriber appears, we add it to a set and connect to the underlying source of events lazily. Conversely, when the last
Subscriber disappears, we shut down the upstream source. The key here is to always have exactly one connection to the upstream system rather than one connection per subscriber. This works and is quite robust, however, the implementation seems too low-level and error-prone. Access to the
subscribers set must be
synchronized, but the collection itself must also support safe iteration. Calling
register() must appear before adding the
deregister() callback; otherwise, the latter can be called before we register. There must be a better way to implement such a common scenario of multiplexing a single upstream source to multiple
Observers—luckily, there are at least two such mechanisms. RxJava is all about reducing such dangerous boilerplate and abstracting away concurrency.
Observable is eager, so we want to postpone its creation.
defer() will wait until the last possible moment to actually create
Observable; that is, until someone actually subscribes to it.
ConnectableObservable is an interesting way of coordinating multiple
Subscribers and sharing a single underlying subscription.
ConnectableObservable is a type of
Observable that ensures there exists at most one
Subscriber at all times, but in reality there can be many of them sharing the same underlying resource.
Remember our first attempt at creating a single, lazy connection to an underlying resource with
LazyTwitterObservable? We had to manually keep track of all
subscribers and connect/disconnect as soon as the first subscriber appeared or the last one left.
When we try to use this
Subscriber establishes a new
connection, like so:
Here is the output:
This time, to simplify, we use a parameterless
subscribe() method that triggers subscription but drops all events and notifications.
The most scalable and simplest solution: the
The output is much like what we expect:
refCount() does is basically count how many active
Subscribers we have at the moment, much like reference counting in historic garbage-collection algorithms. When this number goes from zero to one, it subscribes to the upstream
Observable. Every number above one is ignored and the same upstream
Subscriber is simply shared between all downstream
Subscribers. However, when the very last downstream
Subscriber unsubscribes, the counter drops from one to zero and
refCount() knows it must unsubscribe right away.
The connection is not established until we actually get the first
Subscriber. But, more important, the second
Subscriber does not initiate a new connection, it does not even touch the original
publish().refCount() tandem wrapped the underlying
Observable and intercepted all subscriptions.
refCount() does precisely what we implemented manually with
LazyTwitterObservable. You can use the
publish().refCount() duet to allow sharing of a single
Subscriber while remaining lazy. This pair of operators is used very frequently and therefore has an alias named
Keep in mind that if unsubscription is shortly followed by subscription,
share() still performs reconnection, as if there were no caching at all.
Another useful use case of the
publish() operator is forcing subscription in the absence of any
We can call
Observable.publish() on any
Observable and get
ConnectableObservable in return.
Anyone who subscribes to
ConnectableObservable is placed in a set of
Subscribers. As long as
connect() is not called, these
Subscribers are put on hold, they never directly subscribe to upstream
Observable. However, when
connect() is called, a dedicated mediating
Subscriber subscribes to upstream
tweets), no matter how many downstream subscribers appeared before—even if there were none. But if there were some
ConnectableObservable put on hold, they will all receive the same sequence of notifications.
This mechanism has multiple advantages. Imagine that you have an
Observable in your application in which multiple
Subscribers are interested. On startup, several components (e.g., Spring beans or EJBs) subscribe to that
Observable and begin listening. Without
ConnectableObservable, it is very likely that hot
Observable will begin emitting events that will be consumed by the first
Subscribers started later will miss out on the early events. This can be a problem if you want to be absolutely sure that all
Subscribers receive a consistent view of the world. All of them will receive events in the same order, unfortunately
Subscriber appearing late will lose early notifications.
The solution to this problem is to
publish() such an
Observable first and make it possible for all of the components in your system to
subscribe(); for example, during application startup. When you are 100% sure that all
Subscribers that need to receive the same sequence of events (including initial event) had a chance to
subscribe(), connect such
connect(). This will create a single
Subscriber in upstream
Observable and begin pushing events to all downstream
Subscribers. The following example uses Spring framework, but as a matter of fact it is framework agnostic:
Our simple application first eagerly creates an
ConnectableObservable subclass underneath).
Observables are lazy by design, so it is fine to create them even statically. This
publish()-ed so that all subsequent
Subscribers are put on hold and do not receive any notifications until we do
connect(). Later, two
@Components are found that require this
Observable. Dependency injection framework provides our
ConnectableObservable and allows everyone to subscribe. However, the events, even in case of hot
Observable will not arrive until full application startup. When all of the components are instantiated and wired together, a
ContextRefreshedEvent sent from the framework can be consumed. At this point, we can guarantee that all components had a chance to request a given
subscribe() to it. When the application is about to start, we call
connect(). This subscribes to the underlying
Observable exactly once and forwards the exact same sequence of events to every component. The trimmed-down logging output might look as follows (the component names are in square brackets):
An instance of
Observable does not emit any events until someone is actually interested in receiving them. To begin watching an
Observable, you use the
subscribe() family of methods:
Observable does not throw exceptions. Instead, exceptions are just another type of notification (event) that
Observable can propagate. Therefore, you do not use the
catch block around
subscribe() to catch exceptions produced along the way. Instead, you provide a separate callback:
There are multiple overloaded versions of
subscribe() that are more specific.
The second argument to
subscribe() is optional. It notifies about exceptions that can potentially occur while producing items. It is guaranteed that no other
Tweet will appear after the exception.
The third optional callback allows us to listen for stream completion:
As a side note, often you can use Java 8 method references instead of
lambdas to improve readability, as illustrated here:
It turns out that providing all three arguments to
subscribe() is quite useful, thus it would be helpful to have a simple wrapper holding all three callbacks. This is what
Observer<T> was designed for.
Observer<T> is a container for all three callbacks, receiving all possible notifications from
Observable<T>. Here is how you can register an
As a matter of fact
Observer<T> is the core abstraction for listening in RxJava. Yet if you want even greater control,
Observers abstract implementation) is even more powerful.
There are two means to support that:
unsubscribe to cancel a subscription.
Subscription is a handle that allows client code to cancel a subscription by using the
unsubscribe() method. Additionally, you can query the status of a subscription by using
isUnsubscribed(). It is important to unsubscribe from
Observable<T> as soon as you no longer want to receive more events; this avoids memory leaks and unnecessary load on the system.
We know that we can use
Subscription to control subscription outside of the
Observer or callback.
Subscriber<T>, on the other hand, implements both
Subscription. Thus, it can be used both to consume notifications (events, completions, or errors) and control subscription.
The code example that follows subscribes to all events, but the subscriber itself decides to give up receiving notifications under certain criteria. Normally, this can be done by using the built-in
takeUntil() operator, but for the time being we can unsubscribe manually:
subscribe() is called, our subscription handler inside
create() is invoked.
Observable.just(42) should emit
42 to every subscriber, not just the first one. On the other hand, if you put a database query or heavyweight computation inside
create(), it might be beneficial to share a single invocation among all subscribers.
Consider the following code sample that subscribes to the same Observable twice:
The out put is:
If you would like to avoid calling
create() for each subscriber and simply reuse events that were already computed, there exists a handy
With caching, the output for two
Subscribers is quite different:
When the first subscriber appears,
cache() delegates subscription to the underlying
Observable and forwards all notifications (events, completions, or errors) downstream. However, at the same time, it keeps a copy of all notifications internally. When a subsequent subscriber wants to receive pushed notifications,
cache() no longer delegates to the underlying
Observable but instead feeds cached values.
Of course, you must keep in mind that
cache() plus infinite stream is the recipe for a disaster, also known as
It is advised to check the
isUnsubscribed() flag as often as possible to avoid sending events after a subscriber no longer wants to receive new events.
Rather than have a blocking loop running directly in the client thread, we spawn a custom thread and emit events directly from there.
Please note that you should not use explicit threads inside
create(). Concurrency and custom schedulers that allow you to write concurrent code without really interacting with threads yourself.
Even if someone poorly implemented the
Observable, we can easily fix it by applying the
serialize() operator, such as
loadAll(...).serialize(). This operator ensures that events are serialized and sequenced. It also enforces that no more events are sent after completion or error.
Subject class is quite interesting because it extends
Observable and implements
Observer at the same time. What that means is that you can treat it as
Observable on the client side (subscribing to upstream events) and as
Observer on the provider side (pushing events downstream on demand by calling
onNext() on it).
Typically, what you do is keep a reference to
Subject internally so that you can push events from any source you like but externally expose this
Subject is a useful tool for creating
Observable instances when
Observable.create(...) seems too complex to manage.
Subjects are useful, but there are many subtleties you must understand. For example, after calling
Subject silently drops subsequent
onError notifications, effectively swallowing them.
PublishSubject is one of the flavors (subclasses) of
Subject. Other types of
Subjects include the following:
Remembers last emitted value and pushes it to subscribers when
onComplete()is called. As long as
AsyncSubjecthas not completed, events except the last one are discarded.
Pushes all events emitted after subscription happened, just like
PublishSubject. However, first it emits the most recent event that occurred just before subscription. This allows
Subscriberto be immediately notified about the state of the stream. For example,
Subjectmay represent the current temperature broadcasted every minute. When a client subscribes, he will receive the last seen temperature immediately rather than waiting several seconds for the next event. But the same
Subscriberis not interested in historical temperatures, only the last one. If no events have yet been emitted, a special default event is pushed first (if provided).
The most interesting type of
Subjectthat caches events pushed through the entire history. If someone subscribes, first he receives a batch of missed (cached) events and only later events in real-time. By default, all events since the creation of this
Subjectare cached. This can be become dangerous if the stream is infinite or very long. In that case, there are overloaded versions of
ReplaySubjectthat keep only the following:
+ Configurable number of events in memory (`createWithSize()`) + Configurable time window of most recent events (`createWithTime()`) + Or even constraint both size and time (whichever limit is reached first) with `createWithTimeAndSize()`
Subjects should be treated with caution: often there are more idiomatic ways of sharing subscriptions and caching events—for example, “ConnectableObservable”. For the time being, prefer relatively low-level
Observable.create() or even better, consider standard factory methods like
One more thing to keep in mind is concurrency. By default calling
onNext() on a
Subject is directly propagated to all
onNext() callback methods. It is not a surprise that these methods share the same name. In a way, calling
Subject indirectly invokes
onNext() on each and every
Subscriber. But you need to keep in mind that according to Rx Design Guidelines all calls to
Observer must be serialized (i.e., sequential), thus two threads cannot call
onNext() at the same time. However, depending on the way you stimulate
Subject, you can easily break this rule—e.g., calling
Subject.onNext() from multiple threads from a thread pool. Luckily, if you are worried that this might be the case, simply call
.toSerialized() on a
Subject, which is quite similar to calling
Observable.serialize(). This operator makes sure downstream events occur in the correct order.
interval() use threads underneath.
The former simply creates an
Observable that emits a
long value of zero after a specified delay and then completes:
The fixed value of
0 (in variable
zero) is just a convention without any specific meaning. It is basically an asynchronous equivalent of
Thread.sleep(). Rather than blocking the current thread, we create an
subscribe() to it.
interval() generates a sequence of
long numbers, beginning with zero, with a fixed delay between each one of them:
interval() is sometimes used to control animations or processes that need to run with certain frequency.
Single<T> is basically a container for a future value of type
Single is typically used for APIs known to return a single value (duh!) asynchronously and with high probability of failure.
There are few ways to create a
Single, beginning with the constant
subscribe() method takes two arguments rather than three. There is simply no point in having an
We will use async-http-client that happens to use Netty underneath, as well. After making an HTTP request we can provide a callback implementation that will be invoked asynchronously whenever a response or error comes back. This fits very nicely into how
Single is created:
Single.create() looks similar to
Observable.create() but it has some important constraints; you must call either
onSuccess() once or
onError() once. Technically, it is also possible to have a
Single that never completes, but multiple
onSuccess() invocations are not allowed. Speaking of
Single.create() you can also try
Single.fromCallable() that accepts
Callable<T> and returns
Single<T>. As simple as that.
You can use
Single in similar fashion to
Response.getResponseBody() throws an
IOException, so we cannot simply say:
map(Response::getResponseBody). By wrapping the potentially dangerous
getResponseBody() method with
Single<String>, we make sure potential failure is encapsulated and clearly expressed in type system.
Single has its very own
BlockingSingle created with
Single.toBlocking(). Analogously, creating
BlockingSingle<T> does not yet block. However, calling
value() on it blocks until value of type
String containing the response body in our example) is available. In case of exception, it will be rethrown from
Suppose that you are rendering an article to be displayed on your website.
Three independent operations need to be made to fulfill the request: reading the article content from the database, asking a social media website for a number of likes collected so far, and updating the read count metric.
Combining these three operations with
zip is quite straightforward: