The following class represents a trivial repository abstracting us from a database:
Change it to Observable:
We want to transform
Observable<Person> back into
List<Person> to limit the scope of refactoring:
peopleList.toList() will buffer all
Person events and keep them in memory until the
onCompleted() event is received.
toList() is asynchronous; it doesn’t wait for all events to arrive but instead lazily buffers all values.
BlockingObservable is a good idea only when you must provide a blocking, static view of your otherwise asynchronous
Observable.from(List<T>) converts normal pull-based collection into
toBlocking() does something quite the opposite.
single() drops observables altogether and extracts one, and only one, item we expect to receive from
BlockingObservable<T>. This means
single() will block waiting for
This time with all operators chained:
We can actually simulate reading an entire database starting from given page:
This code snippet lazily loads the initial page of database records,
for example 10 items. If no one subscribes, even this first query is not
invoked. If there is a subscriber that only consumes a few initial elements (e.g.,
allPeople(0).take(3)), RxJava will unsubscribe automatically from our stream and no more queries are executed.
allPeople(initialPage + 1) without any stop condition. This is a recipe for
StackOverflowError in most languages, but not here. Again, calling
allPeople() is always lazy, therefore the moment you stop listening (unsubscribe), this recursion is over.
Lazy paging and concatenation
There are more ways to implement lazy paging with RxJava. If you think about it, the simplest way of loading paged data is to load everything and then take whatever we need. It sounds silly, but thanks to laziness it is feasible. First we generate all possible page numbers and then we request loading each and every page individually:
if we find an empty page it means all further pages are empty, as well. Therefore, we use
takeWhile() rather than
filter(). To flatten
Observable<Person> we can use
concatMap() requires a transformation from
Observable<Person>, executed for each page. Alternatively we can try
concatMapIterable(), which does the same thing, but the transformation should return an
Iterable<Person> for each upstream value (happening to be
No matter which approach you choose, all transformations on
Person object are lazy. As long as you limit the number of records you want to process (for example with
Observable<Person> will invoke
listPeople() as late as possible.
It is easy to see that inner
flatMap() in our example ignores
response and returns an empty stream. In such cases,
flatMap() is an overkill; the
ignoreElements() operator is far more efficient.
ignoreElements() simply ignores all emitted values and forwards
onError() notifications. Because we are ignoring the actual response and just deal with errors,
ignoreElements() works great here.