Using Rxjava And Okhttp
Solution 1:
First add RxAndroid
to your dependencies, then create your Observable
like this:
Subscription subscription = Observable.create(newObservable.OnSubscribe<Response>() {
OkHttpClient client = newOkHttpClient();
@Overridepublicvoidcall(Subscriber<? super Response> subscriber) {
try {
Response response = client.newCall(newRequest.Builder().url("your url").build()).execute();
if (response.isSuccessful()) {
if(!subscriber.isUnsubscribed()){
subscriber.onNext(response);
}
subscriber.onCompleted();
} elseif (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
subscriber.onError(newException("error"));
}
} catch (IOException e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(newSubscriber<Response>() {
@OverridepublicvoidonCompleted() {
}
@OverridepublicvoidonError(Throwable e) {
}
@OverridepublicvoidonNext(Response response) {
}
});
It will request your url in another thread (io thread) and observe it on android main thread.
And finally when you leave the screen use subsribtion.unsubscribe()
to avoid memory leak.
When you use Observable.create
, you should write a lot of boilerplate code, also you must handle subscription by your own. A better alternative is to use defer.
Form the doc:
do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.
So as Marcin Koziński mentioned, you just need to do this:
finalOkHttpClientclient=newOkHttpClient();
Observable.defer(newFunc0<Observable<Response>>() {
@Overridepublic Observable<Response> call() {
try {
Responseresponse= client.newCall(newRequest.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
Solution 2:
It's easier and safer to use Observable.defer()
instead of Observable.create()
:
finalOkHttpClientclient=newOkHttpClient();
Observable.defer(newFunc0<Observable<Response>>() {
@Overridepublic Observable<Response> call() {
try {
Responseresponse= client.newCall(newRequest.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
That way unsubscription and backpressure are handled for you. Here's a great post by Dan Lew about create()
and defer()
.
If you wished to go the Observable.create()
route then it should look more like in this library with isUnsubscribed()
calls sprinkled everywhere. And I believe this still doesn't handle backpressure.
Solution 3:
I realise this post is a bit old, but there's a new and more convenient way of doing this now
Observable.fromCallable {
client.newCall(Request.Builder().url("your url").build()).execute()
}
More info: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/
Solution 4:
I came late to the discussion but, if for some reason the code need to stream the response body, then defer
or fromCallable
won't do it. Instead one can employ the using
operator.
Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
response -> { // 2
...
return Single.just((Consumer<OutputStream>) fileOutput -> {
try (InputStream upstreamResponseStream = response.body().byteStream();
OutputStream fileOutput = responseBodyOutput) {
ByteStreams.copy(upstreamResponseStream, output);
}
});
},
Response::close, // 3false) // 4
.subscribeOn(Schedulers.io()) // 5
.subscribe(copier -> copier.accept(...), // 6
throwable -> ...); // 7
- The first lambda executes the response after upon subscription.
- The second lambda creates the observable type, here with
Single.just(...)
- The third lambda disposes the response. With
defer
one could have used the try-with-resources style. - Set the
eager
toggle tofalse
to make the disposer called after the terminal event, i.e. after the subscription consumer has been executed. - Of course make the thing happen on another threadpool
- Here's the lambda that will consume the response body. Without
eager
set tofalse
, the code will raise an IOException with reason 'closed' because the response will be already closed before entering this lambda. - The
onError
lambda should handle exceptions, especially theIOException
that cannot be anymore caught with theusing
operator as it was possible with a try/catch withdefer
.
Solution 5:
Okhttp3 with RxSingle background API call.
Disposabledisposables= Single.fromCallable(() -> {
Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
OkHttpClientclient= Util.getHttpClient();
Requestrequest=newRequest.Builder()
.addHeader("Authorization", "Bearer " + Util.getUserToken())
.url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
.build();
Responseresponse= client.newCall(request).execute();
if(response.isSuccessful()) {
...
return ; // Any type
} else {
return ; // Any type
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((result) -> {
Log.d(TAG, "api() completed");
});
compositeDisposable.add(disposables);
Post a Comment for "Using Rxjava And Okhttp"