Rxjava 2: Always Unsubscribe On The .subscribeon(..) Scheduler?
I have an Observable that performs some work. After its done, it closes its connections (via .setDisposable(Disposables.fromAction(logic*);. Trouble is, I need this c
Solution 1:
You need a single threaded scheduler for that to work and you can get one via Schedulers.from(Executor)
or new SingleScheduler
(warning: internal).
The problem with unsubscribeOn
is that it runs only if the downstream actually disposes/cancels the stream (unlike 1.x where this happens almost all the time).
A better way is to use using
with the aforementioned custom scheduler and manually schedule the cleanup:
Observable<T> createObservable(Scheduler scheduler) {
return Observable.create(s -> {
Resourceres= ...
s.setCancellable(() ->
scheduler.scheduleDirect(() ->
res.close() // may need try-catch here
)
);
s.onNext(...);
s.onComplete();
}).subscribeOn(scheduler);
}
Schedulerscheduler=newSingleScheduler();
createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);
But note that unless the create
logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock.
Post a Comment for "Rxjava 2: Always Unsubscribe On The .subscribeon(..) Scheduler?"