Schedulerworks like a single-threaded program with blocking method calls passing data between one another.
Observablewith a single
subscribeOn()is like starting a big task in the background
Thread. The program within that
Threadis still sequential, but at least it runs in the background.
flatMap()where each internal
java.util.concurrent, where each substream is a fork of execution and
flatMap()is a safe join stage.
In principle it works similarly to
Schedulers are used together with
observeOn() operators as well as when creating certain types of
Observables. A scheduler only creates instances of
Workers that are responsible for scheduling and running code. When RxJava needs to schedule some code it first asks
Scheduler to provide a
Worker and uses the latter to schedule subsequent tasks.
You will find examples of this API later on, but first familiarize yourself with available built-in schedulers:
There are numerous operators that by default use some
Schedulers.computation() is used if none is supplied. For example, the
delay() operator takes upstream events and pushes them downstream after a given time.
Without supplying a custom
schedulerA, all operators below
delay() would use the
Scheduler. Other important operators that support custom
timeout(). If you do not provide a scheduler to such operators,
Scheduler is utilized, which is a safe default in most cases.
This scheduler simply starts a new thread every time it is requested via
newThread() is hardly ever a good choice, not only because of the latency involved when starting a thread, but also because this thread is not reused.
Stack space must be allocated up front (typically around one megabyte, as controlled by the
-Xss parameter of the JVM) and the operating system must start new native thread. When the
Worker is done, the thread simply terminates.
This scheduler can be useful only when tasks are coarse-grained: it takes a lot of time to complete but there are very few of them, so that threads are unlikely to be reused at all. In practice, following
Schedulers.io() is almost always a better choice.
This scheduler is similar to
newThread(), but already started threads are recycled and can possibly handle future requests. This implementation works similarly to
java.util.concurrent with an unbounded pool of threads. Every time a new
Worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused.
io() is not a coincidence. Consider using this scheduler for I/O bound tasks which require very little CPU resources. However they tend to take quite some time, waiting for network or disk. Thus, it is a good idea to have a relatively big pool of threads. Still, be careful with unbounded resources of any kind—in case of slow or unresponsive external dependencies like web services,
io() scheduler might start an enormous number of threads, leading to your very own application becoming unresponsive, as well.
You should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code (reading from disk, network, sleeping, waiting for lock, etc.) Because each task executed on this scheduler is supposed to fully utilize one CPU core, executing more such tasks in parallel than there are available cores would not bring much value. Therefore,
computation() scheduler by default limits the number of threads running in parallel to the value of
availableProcessors(), as found in the
Runtime.getRuntime() utility class.If for some reason you need a different number of threads than the default, you can always use the
rx.scheduler.max-computation-threads system property. By taking less threads you ensure that there is always one or more CPU cores idle, and even under heavy load,
computation() thread pool does not saturate your server. It is not possible to have more computation threads than cores.
computation() scheduler uses unbounded queue in front of every thread, so if the task is scheduled but all cores are occupied, they are queued. In case of load peak, this scheduler will keep the number of threads limited. However, the queue just before each thread will keep growing.
Luckily, built-in operators, especially
observeOn() ensure that this
Scheduler is not overloaded.
Schedulers are internally more complex than
java.util.concurrent, so a separate abstraction was needed. But because they are conceptually quite similar, unsurprisingly there is a wrapper that can turn
Scheduler using the
from() factory method:
I am intentionally using this verbose syntax for creating
ExecutorService rather than the more simple version:
Although tempting, the
Executors factory class hardcodes several defaults that are impractical or even dangerous in enterprise applications.
For examples, it uses unbounded
LinkedBlockingQueue that can grow infinitely, resulting in
OutOfMemoryError for cases in which there are a of large number of outstanding tasks.
Also, the default
ThreadFactory uses meaningless thread names like
pool-5-thread-3. Naming threads properly is an invaluable tool when profiling or analyzing thread dumps.
ThreadFactory from scratch is a bit cumbersome, so we used ThreadFactoryBuilder from Guava.
Creating schedulers from
Executor that we consciously configured is advised for projects dealing with high load. However, because RxJava has no control over independently created threads in an
Executor, it cannot pin threads (that is, try to keep work of the same task on the same thread to improve cache locality). This
Scheduler barely makes sure a single
Scheduler.Worker processes events sequentially.
Schedulers.immediate() is a special scheduler that invokes a task within the client thread in a blocking fashion, rather than asynchronously. Using it is pointless unless some part of your API requires providing a scheduler, whereas you are absolutely fine with default behavior of
Observable, not involving any threading at all. In fact, subscribing to an
Observable (more on that in a second) via
Scheduler typically has the same effect as not subscribing with any particular scheduler at all. In general, avoid this scheduler, it blocks the calling thread and is of limited use.
trampoline() scheduler is very similar to
immediate() because it also schedules tasks in the same thread, effectively blocking. However, as opposed to
immediate(), the upcoming task is executed when all previously scheduled tasks complete.
immediate() invokes a given task right away, whereas
trampoline() waits for the current task to finish. Trampoline is a pattern in functional programming that allows implementing recursion without infinitely growing the call stack. This is best explained with an example, first involving
immediate(). By the way, notice that we do not interact directly with a
Scheduler instance but first create a
The output is as expected; you could actually replace
schedule() with a simple method invocation:
Outer block we
Inner block that gets invoked immediately, interrupting the
Outer task. When
Inner is done, the control goes back to
Outer. Again, this is simply a convoluted way of invoking a task in a blocking manner indirectly via
Scheduler. But what happens if we replace
Schedulers.trampoline()? The output is quite different:
Scheduler outputs the following:
Scheduler is used only for testing purposes, and you will never see it in production code. Its main advantage is the ability to arbitrarily advance the clock, simulating time passing by.
subscribe() by default uses the client thread. To recap, here is the most simple subscription that you can come up with where no threading was involved whatsoever:
There is an inherent but hidden connection between
create(). Every time you call
subscribe() on an
OnSubscribe callback method is invoked (wrapping the lambda expression you passed to
create()). It receives your
Subscriber as an argument.
By default, this happens in the same thread and is blocking, so whatever you do inside
create() will block
subscribe(). If your
create() method sleeps for few seconds,
subscribe() will block. Moreover, if there are operators between
Observable.create() and your
Subscriber (lambda acting as callback), all these operators are invoked on behalf of the thread that invoked
RxJava does not inject any concurrency facilities by default between
Subscriber. The reason behind that is that
Observables tend to be backed by other concurrency mechanisms like event loops or custom threads, so Rx lets you take full control rather than imposing any convention.
subscribeOn() anywhere between an original
subscribe(), you declaratively select
Scheduler where the
OnSubscribe callback method will be invoked. No matter what you do inside
create(), this work is offloaded to an independent
Scheduler and your
subscribe() invocation no longer blocks:
schedulerA as well as
Sched-A-0 thread come from the following sample schedulers we built for illustration purposes:
In mature applications, in terms of Rx adoption,
subscribeOn() is very seldom used. Normally,
Observables come from sources that are naturally asynchronous or apply scheduling on their own. You should treat
subscribeOn() only in special cases when the underlying
Observable is known to be synchronous (
create() being blocking).
Most of the time
Observables come from asynchronous sources and they are treated as asynchronous by default. Therefore, using
subscribeOn() is very limited, mostly when retrofitting existing APIs or libraries.
- If two invocations of the
subscribeOn()closest to the original
- If you are designing an API and you use
subscribeOn()internally, the client code has no way of overriding the
Schedulerof your choice. This can be a conscious design decision; after all, the API designer might know best which
Scheduleris appropriate. On the other hand, providing an overloaded version of said API that allows overriding the chosen
Scheduleris always a good idea.
All operators are executed by default in the same thread (scheduler), no concurrency is involved by default:
subscribeOn() is not relevant, it can be right after
Observable or just before
RxJava creates a single
Worker instance for the entire pipeline, mostly to guarantee sequential processing of events. This means that if one of your operators is particularly slow—for example,
map() reading data from disk in order to transform events passing by—this costly operation will be invoked within the same thread. A single broken operator can slow down the entire pipeline, from production to consumption. This is an antipattern in RxJava, operators should be nonblocking, fast, and as pure as possible.
Rather than blocking within
map(), we can invoke
flatMap() and asynchronously collect all the results.
When purchasing several goods we would like to parallelize as much as possible and calculate total price for all goods in the end. The first attempt is fruitless:
The code does not work concurrently because there is just a single flow of events, which by design must run sequentially. The main
Observable emitting products cannot be parallelized. However, for each product, we create a new, independent
Observable as returned from
purchase(). Because they are independent, we can safely schedule each one of them concurrently:
Each substream created within
flatMap() is supplied with a
schedulerA. Every time
subscribeOn() is used to the
Scheduler gets a chance to return a new
Worker, and therefore a separate thread (simplifying a bit):
We can no longer rely on the order of downstream events—they neither begin nor complete in the same order as they were emitted (the original sequence began at bread). When events reach the
reduce() operator, they are already sequential and well behaving.
We can declaratively batch such requests by using
groupBy()—and this still works with declarative concurrency:
subscribeOn() allows choosing which
Scheduler will be used to invoke
OnSubscribe (lambda expression inside
observeOn() controls which
Scheduler is used to invoke downstream
Subscribers occurring after
For example, calling
create() happens in the
subscribeOn(io())) to avoid blocking the user interface. However, updating the user interface widgets must happen in the UI thread (both Swing and Android have this constraint), so we use
observeOn() for example with
AndroidSchedulers.mainThread() before operators or subscribers changing UI.
observeOn() occurs somewhere in the pipeline chain, and this time, as opposed to
subscribeOn(), the position of
observeOn() is quite important. No matter what
Scheduler was running operators above
observeOn() (if any), everything below uses the supplied
observeOn() is run within the supplied
Scheduler, of course until another
observeOn() is encountered. Additionally
subscribeOn() can occur anywhere between
subscribe(), but this time it only affects operators down to the first
Subscription occurs in
schedulerA because that is what we specified in
"Found 1" operator was executed within that
Scheduler because it is before the first
observeOn(). Later, the situation becomes more interesting.
observeOn() switches current
"Found 2" is using this one, instead. The last
observeOn(schedulerC) affects both
"Found 3" operator as well as
Subscriber. Remember that
Subscriber works within the context of the last encountered
subscribeOn() can be placed close to the original
Observable to improve readability, whereas
observeOn() is close to
subscribe() so that only
Subscriber uses that special
Scheduler, other operators rely on the
Here is a more advanced program that takes advantage of these two operators:
RxJava controls concurrency with just two operators (
observeOn()), but the more you use reactive extensions, the less frequently you will see these in production code.
This small library will add the
AndroidSchedulers class to your CLASSPATH, which is essential for writing concurrent code on Android with RxJava.
When all transformations are done, we invoke a UI update only on the main thread because we want to carry out as little processing as possible there.