Sometimes you do not want to onError method of Observer be called. Or you want to react only on some specific event of the lifecycle of Observable. Functions for both of these things are in this post.

Error functions

Functions in this section operate with errors, which can occures during emitting items.

Catch

Catch error emitted by Observable and continues with specified item, items or allowing Observable to continue.

Method variants: onErrorResumeNext(Function), onErrorResume(ObservableSource), onErrorReturn(Function), onErrorReturnItem(T)

Return item in case of error

Subject<Integer> observable = PublishSubject.create();
observable.onErrorReturnItem(999).subscribe(new PrintObserver());
observable.onNext(1);
observable.onError(new IllegalArgumentException());
observable.onNext(2);
On subscribe.
On next: 1
On next: 999
On completed.

Return Observable in case of error

In this example new Observable is returned in case of error. But if you change returning of new Observable to returning our main Subject, you will get Subject, which will continue after error.

observable = PublishSubject.create();
observable.onNext(1);
observable.onError(new IllegalArgumentException());
observable.onNext(2);
observable.onErrorResumeNext(Observable.just(997, 998)).subscribe(new PrintObserver());
On subscribe.
On next: 997
On next: 998
On completed.

Retry

Catch error emitted by Observable and resubscribe to source Observable. So, some items can be emitted again.

Method variants: retry, retry(times), retry(throwable_predicate), retry(BiPredicate<Integer, Throwable>), retryUntil(BooleanSupplier)

Retry after two exceptions

final Subject<Integer> subject = ReplaySubject.create();
subject.onNext(1);
subject.onNext(2);
subject.onError(new IllegalArgumentException());
subject.onNext(3);
subject.onComplete();

subject.retry(2).subscribe(new PrintObserver("2 times"));
On subscribe.
On next: 1
On next: 2
On next: 1
On next: 2
On next: 1
On next: 2
On error: IllegalArgumentException: null

Retry only if specific exception is thrown

final Subject<Integer> subject = ReplaySubject.create();
subject.onNext(1);
subject.onNext(2);
subject.onError(new IllegalArgumentException());
subject.onNext(3);
subject.onError(new IllegalStateException());
subject.onComplete();

subject.retry(throwable -> throwable instanceof IllegalStateException)
     .subscribe(new PrintObserver());
On subscribe.
On next: 1
On next: 2
On error: IllegalArgumentException: null

Utility functions

In this section we will look at some functions, which do some useful things.

Delay

It will shift forward begin of source Observable by specified time.

Method variants: delay(delay, timeUnit), delay(delay, timeUnit, delayError), delay(delay, timeUnit, scheduler)

final Observable<Integer> observable = Observable.range(0, 4);
observable.delay(20, TimeUnit.MILLISECONDS).subscribe(new PrintObserver());
Thread.sleep(19);
System.out.println("After 19 millis");
Thread.sleep(1);
System.out.println("After 20 millis");
On subscribe.
After 19 millis
On next: 0
On next: 1
On next: 2
On next: 3
After 20 millis
On complete.

Do

Register callbacks on a certain event of Observable lifecycle.

Method variants: doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnError, doOnLifeCycle, doOnNext, doOneSubscribe, doOnTerminate

Registering callback on each item

final Observable<Integer> observable = Observable.range(0, 4);
observable.doOnEach(integerNotification -> System.out.println("on each: " + integerNotification.getValue()))
	.subscribe(new PrintObserver("Each"));
[Each] On subscribe.
on each: 0
[Each] On next: 0
on each: 1
[Each] On next: 1
on each: 2
[Each] On next: 2
on each: 3
[Each] On next: 3
on each: null
[Each] On completed.

Registering callback after next

final Observable<Integer> observable = Observable.range(0, 4);
observable.doAfterNext(integer -> System.out.println("After next: " + integer))
	.subscribe(new PrintObserver("AfterNext"));
[AfterNext] On subscribe.
[AfterNext] On next: 0
After next: 0
[AfterNext] On next: 1
After next: 1
[AfterNext] On next: 2
After next: 2
[AfterNext] On next: 3
After next: 3
[AfterNext] On completed.

Materialize/Dematerialize

Reverse Observable emition by dividing of emitted events to groups. (Item, error event, complete event)

Materialize

final Observable<Integer> observable = Observable.range(0, 4);
observable.materialize().subscribe(new PrintObserver("Materialize"));
[Materialize] On subscribe.
[Materialize] On next: OnNextNotification[0]
[Materialize] On next: OnNextNotification[1]
[Materialize] On next: OnNextNotification[2]
[Materialize] On next: OnNextNotification[3]
[Materialize] On next: OnCompleteNotification
[Materialize] On completed.

Dematerialize

final Observable<Integer> observable = Observable.range(0, 4);
observable.materialize().dematerialize().subscribe(new PrintObserver("Materialize-Dematerialize"));
[Materialize-Dematerialize] On subscribe.
[Materialize-Dematerialize] On next: 0
[Materialize-Dematerialize] On next: 1
[Materialize-Dematerialize] On next: 2
[Materialize-Dematerialize] On next: 3
[Materialize-Dematerialize] On completed.

Subscribe functions

In this section we will look at some basic functions for the subscription of Observer to Observable.

Subscribe

Main operator which connects Observable with Observer.

Method variants: subscribe(Observer), subscribe(Consumer), subscribe(Consumer, Consumer), subscribe(Consumer, Consumer, Consumer)

final Observable<Integer> observable = Observable.range(0, 4);
observable.subscribe(new PrintConsumer(), new PrintConsumer());
observable.subscribe(new PrintObserver());

SubscribeOn

Specifies the Scheduler in which will Observable operates.

Few of schedulers are: Schedulers.io(), Schedulers.computation(), Schedulers.newThread().

ObserverOn

Specifies the Scheduler in which will Observer operates

Few of schedulers are: Schedulers.io(), Schedulers.computation(), Schedulers.newThread().

Time functions

And in the last section we will look at functions operates with time.

Timeinterval

Convert Observable to Observable, which emits time elapsed between emitted items instead of emitting items.

Method variants: timeInterval(), timeInterval(TimeUnit), timeInterval(Scheduler), timeInterval(TimeUnit, Scheduler)

final Subject<Integer> observable = PublishSubject.create();
observable.timeInterval().subscribe(new PrintObserver());

observable.onNext(1);
Thread.sleep(10);
observable.onNext(2);
observable.onNext(3);
Thread.sleep(25);
observable.onNext(4);
On subscribe.
On next: Timed[time=1, unit=MILLISECONDS, value=1]
On next: 0
On next: 1
On next: 2
On next: 3
On completed.
On next: Timed[time=11, unit=MILLISECONDS, value=2]
On next: Timed[time=0, unit=MILLISECONDS, value=3]
On next: Timed[time=25, unit=MILLISECONDS, value=4]

Timeout

Wraps Observable with timeout ability. That means, if no value after defined time will not be emitted from a source Observable, then error will be emitted.

*Method variants: timeout(long, TimeUnit), timeout(long, TimeUnit, ObservableSource), timeout(long, TimeUnit, Scheduler), timeout(long, TimeUnit, Scheduler, ObservableSource), … *

final Subject<Integer> observable = PublishSubject.create();
observable.timeout(20, TimeUnit.MILLISECONDS).subscribe(new PrintObserver());
observable.onNext(1);
Thread.sleep(10);
observable.onNext(2);
observable.onNext(3);
Thread.sleep(25);
observable.onNext(4);
On subscribe.
On next: 1
On next: 2
On next: 3
On error: TimeoutException: null

Timestamp

Attach a timestamp to each emitted item. Timestamp indicates the time when was item emitted.

Method variants: timestamp, timestamp(timeUnit), timestamp(scheduler), timestamp(timeUnit, scheduler)

final Observable<Integer> observable = Observable.range(0, 4);
observable.timestamp().subscribe(new PrintObserver());
On subscribe.
On next: Timed[time=1527689871840, unit=MILLISECONDS, value=0]
On next: Timed[time=1527689871840, unit=MILLISECONDS, value=1]
On next: Timed[time=1527689871842, unit=MILLISECONDS, value=2]
On next: Timed[time=1527689871842, unit=MILLISECONDS, value=3]
On completed.