RxJava

From bibbleWiki
Jump to navigation Jump to search

Introduction

These are notes on RxJava which I have used with Android/Kotlin. To use with Android at the time you specify the following

dependencies {
...
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    implementation 'io.reactivex.rxjava3:rxjava:3.0.13'
...
}

Observable Types

Singles

Single is an Observable which only emits one item or throws an error. Single emits only one value and applying some of the operator makes no sense. Like we don’t want to take value and collect it to a list.

interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}

Maybes

Maybe is similar to Single only difference being that it allows for no emissions as well.

interface MaybeObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
    void onComplete();
}

Completables

Completable is only concerned with execution completion whether the task has reach to completion or some error has occurred.

interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();    
    void onError(Throwable error);
}

Using RxJava with Retrofit

This example is just to show a practical use of RxJava.

Retrofit

Here is a very simple example of Retrofit which manages the API calls for use. It defines a service which returns a RxJava Single from an endpoint

class CountriesService {
    private val BASE_URL = "https://raw.githubusercontent.com"
    private val api: CountriesApi

    init {
        api = Retrofit.Builder()
            .baseUrl(BASE_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
            .build()
            .create(CountriesApi::class.java)
    }

    fun getCountries(): Single<List<Country>> = api.getCountries()
}

Using in ViewModel

We can now call the service by adding it to a CompositeDisposable. We need to create a new thread, specify the thread to receive the response and what to do with it for both success and error because it is a single. We also need to ensure we clean up should thing go south.

class ListViewModel : ViewModel() {

    private val countriesService = CountriesService()
    private val disposable = CompositeDisposable()

    val countries = MutableLiveData<List<Country>>()
    val countryLoadError = MutableLiveData<Boolean>()
    val loading = MutableLiveData<Boolean>()

    fun refresh() {
        fetchCountries()
    }

    private fun fetchCountries() {
        loading.value = true
        disposable.add(
            countriesService.getCountries()
                // Create new Thread
                .subscribeOn(Schedulers.newThread())
                // Provide Thread to received Response
                .observeOn(AndroidSchedulers.mainThread())
                // What to do with it
                .subscribeWith(object: DisposableSingleObserver<List<Country>>() {

                    // RxJava Success
                    override fun onSuccess(value: List<Country>?) {
                        countries.value = value
                        countryLoadError.value = false
                        loading.value = false
                    }
                    // RxJava error
                    override fun onError(e: Throwable) {
                        countryLoadError.value = true
                        loading.value = false
                    }
                })
        )
    }

    override fun onCleared() {
        super.onCleared()
        disposable.clear()
    }
}