Skip to content Skip to sidebar Skip to footer

Rxjava 2: Always Unsubscribe On The .subscribeon(..) Scheduler?

I have an Observable that performs some work. After its done, it closes its connections (via .setDisposable(Disposables.fromAction(logic*);. Trouble is, I need this c

Solution 1:

You need a single threaded scheduler for that to work and you can get one via Schedulers.from(Executor) or new SingleScheduler (warning: internal).

The problem with unsubscribeOn is that it runs only if the downstream actually disposes/cancels the stream (unlike 1.x where this happens almost all the time).

A better way is to use using with the aforementioned custom scheduler and manually schedule the cleanup:

Observable<T> createObservable(Scheduler scheduler) {
    return Observable.create(s -> {
        Resourceres= ...

        s.setCancellable(() -> 
            scheduler.scheduleDirect(() ->
                res.close() // may need try-catch here
            )
        );

        s.onNext(...);
        s.onComplete();
    }).subscribeOn(scheduler);
}

Schedulerscheduler=newSingleScheduler();

createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);

But note that unless the create logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock.

Post a Comment for "Rxjava 2: Always Unsubscribe On The .subscribeon(..) Scheduler?"