Rx Basics

A programming model that has been gaining substantial ground in recent years is reactive programming. In Android, much of the attention has been on RxJava, the “reactive extensions for Java”. Many libraries offer the ability to be consumed using RxJava, and many Android experts have latched onto RxJava as a way to reduce certain types of complexity in Android app development.

In this chapter, we will review what reactive programming is, what RxJava is, and how you can apply RxJava in your Android app.

However, please understand that reactive programming is a very large topic. Just a complete explanation of RxJava would entail its own book. This chapter should be seen as a launching pad for further explorations of your own, more so than a definitive reference on the subject.

Prerequisites

Understanding this chapter requires that you have read the core chapters of this book.

Life is But a Stream

In order to understand reactive programming, we first need to think about streams.

When a Java programmer hears the term “stream”, what often pops into mind is InputStream and OutputStream. Those offer access to a stream of bytes, for input and output, respectively. Here, “stream” means that the bytes are available one at a time (though are often retrieved in a clump, such as an 8192-byte buffer), and that once removed from the stream the bytes are considered to be “consumed” and are no longer available from the stream itself.

When Java programmers think of InputStream and OutputStream, what often pops into mind is FileInputStream and FileOutputStream. With FileInputStream, the source of the bytes is fixed: the contents of a designated file. With FileOutputStream, the destination of the bytes is fixed: once again, a designated file.

However, there are many other sources of InputStream and OutputStream. Some that you encounter in the book are:

Particularly in the HTTP case, the source of the bytes is “live”, insofar as there does not have to be some specific file that is the source of those bytes. Those bytes could represent a generated Web page, or a live audio stream, or anything else.

Hence, more generally, a stream represents a flow of data, where we can pull data off of the stream and do something with it. That “flow of data” could be bytes from a file, as we see with InputStream. But lots of other things could be modeled as flows of data. Pretty much anything where the data would come to us asynchronously could be modeled this way, such as:

You could even model some things that might not feel like a “stream” as a stream if you wanted to. For example, querying a database or ContentProvider gives you a Cursor back, and you could model that as being a stream of rows.

Action and Reaction

Reactive programming works with streams. Rather than “iterate over the rows in the Cursor and do X to each”, you say “as a row comes in from the stream of rows, do X to each”. Here, you are “reacting” to the availability of a row.

But part of reactive programming is that what processes a stream might itself turn around and be a stream of events to something else. For example, given a stream of audio signals, you might implement a filter that clamps high-volume signals to a lower volume. That filter takes in a stream (raw signals) and emits another stream (clamped signals). Other code could react to the filtered stream (e.g., to record bytes to disk).

You may have already done reactive programming, without even realizing it. The world’s most popular reactive programming environment is not some functional programming language or UI framework.

It’s a spreadsheet.

Spreadsheet cells — particularly for simpler sheets — can be thought of as either having data (particularly numbers) or formulas. A formula references cells and performs a calculation upon them. This includes performing calculations upon other cells that contain formulas.

For example, you might have a spreadsheet like this:

Simple Spreadsheet
Figure 359: Simple Spreadsheet

Here, the two formulas are shown in bold, while the data cells are in the default font.

The average formula cell has =AVERAGE(B1:B5), to compute the average of the five number cells above it. The result formula cell has =B6*B7, to multiply the average by the “factor” number.

If you change that factor, not only does that cell change, but so does the result formula cell. The B6 and B7 values referenced in the result formula do not just identify cells, but represent streams of changes to those cells’ values. When B7 (the factor value) changes, the result formula reacts to the changed-value event and recalculates its formula, showing the result:

Simple Spreadsheet with New Factor Value
Figure 360: Simple Spreadsheet with New Factor Value

Similarly, if you change one of the five initial values (1, 1, 2, 3, 5), the average formula cell responds to the changed-value event and recalculates its value. That, in turn, triggers a changed-value event that causes the result formula cell to react and recalculate its value:

Simple Spreadsheet with New Initial and New Factor Value
Figure 361: Simple Spreadsheet with New Initial and New Factor Value

A Rx For What Ails You

At this point, you may be wondering what this has to do with Android, or even Java.

The pre-eminent framework for reactive programming in Java is RxJava. RxJava is a library that helps you model data as streams and apply a chain of operations to those streams.

RxJava is part of the ReactiveX series of libraries, offering reactive programming for a wide range of programming languages and platforms.

In late 2016, RxJava 2 was released. This includes some refactoring of the original RxJava classes to comply with the Reactive Streams specification. This chapter focuses on RxJava 2. Some material that you read on RxJava might be using the original RxJava API. While pretty much everything done with classic RxJava can be done with RxJava2, some conversion may be required.

(BTW, for those of you wondering about the title of this section, Rx is an abbreviation for the word “prescription”)

Rx and Lambdas

RxJava relies heavily on functions. Java is not a functional programming language, and so it does not have “functions” as standalone first-class units of programming.

Java 8 offers lambda expressions, which are akin to anonymous functions. RxJava supports Java 8 lambda expressions, and by using Android Studio 3.0+ (and its related build tools), Android developers can use lambda expressions in projects with a minSdkVersion of 9 or higher.

The examples in this chapter use lambda expressions, as that will be the typical way that you see RxJava be used.

A Simple Stream

Back in the chapter on threads, we had the Threads/AsyncRV sample app, which demonstrated using an AsyncTask. There, our data source was a static Java array of strings, and we would not need an AsyncTask to “load” those into a list. However, to keep things simple, we pretended that it took real work to obtain those strings, and we used an AsyncTask to do that work in the background.

Let’s explore reactive programming and RxJava by revamping that example, starting with the Rx/Simple sample project.

Adding the Dependency

RxJava is a library, available from Maven Central and JCenter. So, a typical Android Studio project could add RxJava as a dependency in a module’s build.gradle file:

apply plugin: 'com.android.application'

dependencies {
  implementation "com.android.support:recyclerview-v7:27.1.1"
  implementation "com.android.support:support-fragment:27.1.1"
  implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
}

android {
  compileSdkVersion 27
  buildToolsVersion '27.0.3'

  defaultConfig {
    minSdkVersion 15
    targetSdkVersion 27
  }

  compileOptions {
    sourceCompatibility JavaVersion.VERSION_1_8
    targetCompatibility JavaVersion.VERSION_1_8
  }
}
(from Rx/Simple/app/build.gradle)

Replacing the Loop

The original AsyncTask sample app has a doInBackground() method that looks a bit like this:


@Override
protected Void doInBackground(Void... unused) {
  for (String item : ITEMS) {
    if (isCancelled())
      break;
    
    publishProgress(item);
    SystemClock.sleep(400);
  }

  return(null);
}

Here, ITEMS is our static String array. We iterate over that array, and if our task has not been canceled, we call publishProgress() to add a string to our ListView, and sleep for 400 milliseconds as our way of pretending to do real work.

The first step towards switching to a reactive model is to replace that loop with a stream:

    @Override
    protected Void doInBackground(Void... unused) {
      Observable.fromArray(ITEMS)
        .subscribe(s -> {
          if (!isCancelled()) {
            publishProgress(s);
            SystemClock.sleep(400);
          }
        });

      return(null);
    }
(from Rx/Simple/app/src/main/java/com/commonsware/android/rx/RxDemoFragment.java)

io.reactivex.Observable is the root class for most of what you will do with RxJava. It helps you set up a stream and then react to that stream.

There are several ways to set up a stream — we will see a few of them in this chapter. In this case, we are using fromArray(), which sets up a stream based on the items in that array. In reality, this is a factory method; the returned object is an Observable.

We then call subscribe() on the Observable. This basically represents the sink for the stream of data coming from the Observable. In this case, we use lambda expression implementation of the Consumer interface. Our lambda expression body implements the accept() method from that Consumer interface, and it will be invoked for each piece of data from the stream. In this case, ITEMS is an array of String, and so we are setting up a Consumer of String and therefore we accept() a String. Our lambda expression has our isCancelled() check, our publishProgress() call, and our sleep() call, as with the iterative loop from the original sample.

Be Your Own Stream

Our 400 milliseconds of sleep() is a way of simulating doing work. However, as RxJava is showing us, we are not actually simulating doing work to get the data. Instead, we are simulating doing work to consume the data. Our sleep() call is in our Consumer (or the equivalent lambda expression), not in our data source.

However, we can model this better, by creating our own data source that is a bit more sophisticated than a simple static Java array.

The Rx/Observable sample project replaces fromArray() with a call to create(), which creates an Observable upon a supplied source of data:

    @Override
    protected Void doInBackground(Void... unused) {
      Observable.create(source())
        .subscribe(s -> {
          if (!isCancelled()) {
            publishProgress(s);
          }
        });

      return(null);
    }

    private ObservableOnSubscribe<String> source() {
      return(emitter -> {
        for (String item : ITEMS) {
          if (!isCancelled()) {
            emitter.onNext(item);
            SystemClock.sleep(400);
          }
        }

        emitter.onComplete();
      });
    }
(from Rx/Observable/app/src/main/java/com/commonsware/android/rx/RxDemoFragment.java)

Here, source() is returning a lambda expression that Java maps to an instance of an oddly-named ObservableOnSubscribe interface. An ObservableOnSubscribe is for cases where we are only actually starting to obtain data for our stream when something subscribes to the Observable, not before. In our case, that is fine, as we are still just using our String array.

An ObservableOnSubscribe implements a subscribe() method. This is called when a subscriber subscribes to the Observable. The job of subscribe() is to work with the underlying data and call three methods on the supplied ObservableEmitter:

In our case, we are iterating over our Java array, calling onNext() for each String, and sleeping the 400 milliseconds to simulate doing actual work.

Removing the AsyncTask

At this point, you may be wondering why we are bothering with any of this, as our code gets more and more complex without any particular added value. This reaction is understandable. All we have done is make an AsyncTask more complicated.

So, let’s get rid of the AsyncTask.

Part of the power of RxJava is thread management. You can tell it the threads to use for working with streams, such as doing I/O on a background thread. However, RxJava is a pure Java library. It knows nothing about Android and Android-specific constructs like the main application thread.

Fortunately, there is RxAndroid. This a small library adding some Android-specific smarts to RxJava, such as the notion of the main application thread. This, like RxJava, is available as an artifact:

apply plugin: 'com.android.application'

dependencies {
  implementation "com.android.support:recyclerview-v7:27.1.1"
  implementation "com.android.support:support-fragment:27.1.1"
  implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
  implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
}

android {
  compileSdkVersion 27
  buildToolsVersion '27.0.3'

  defaultConfig {
    minSdkVersion 15
    targetSdkVersion 27
    applicationId "com.commonsware.android.rx.rxandroid"
  }

  compileOptions {
    sourceCompatibility JavaVersion.VERSION_1_8
    targetCompatibility JavaVersion.VERSION_1_8
  }
}
(from Rx/RxAndroid/app/build.gradle)

With that in mind, let’s look at the Rx/RxAndroid sample project and our substantially-revised ListFragment, here named RxDemoFragment:

package com.commonsware.android.rx;

import android.os.Bundle;
import android.os.SystemClock;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.v4.app.Fragment;
import android.support.v7.widget.DividerItemDecoration;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;
import android.widget.Toast;
import java.util.ArrayList;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class RxDemoFragment extends Fragment {
  private static final String[] ITEMS= { "lorem", "ipsum", "dolor",
      "sit", "amet", "consectetuer", "adipiscing", "elit", "morbi",
      "vel", "ligula", "vitae", "arcu", "aliquet", "mollis", "etiam",
      "vel", "erat", "placerat", "ante", "porttitor", "sodales",
      "pellentesque", "augue", "purus" };
  private ArrayList<String> model=new ArrayList<>();
  private RVArrayAdapter adapter;
  private Disposable sub=null;

  @Override
  public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    setRetainInstance(true);

    adapter=new RVArrayAdapter(model, getLayoutInflater());

    Observable<String> observable=Observable
      .create(source())
      .subscribeOn(Schedulers.newThread())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnComplete(() -> {
        Toast.makeText(getActivity(), R.string.done, Toast.LENGTH_SHORT)
          .show();
      });

    sub=observable.subscribe(s -> {
      adapter.add(s);
    });
  }

  private ObservableOnSubscribe<String> source() {
    return(emitter -> {
      for (String item : ITEMS) {
        emitter.onNext(item);
        SystemClock.sleep(400);
      }

      emitter.onComplete();
    });
  }

  @Nullable
  @Override
  public View onCreateView(@NonNull LayoutInflater inflater,
                           @Nullable ViewGroup container,
                           @Nullable Bundle savedInstanceState) {
    return inflater.inflate(R.layout.main, container, false);
  }

  @Override
  public void onViewCreated(View v, Bundle savedInstanceState) {
    super.onViewCreated(v, savedInstanceState);

    RecyclerView rv=v.findViewById(android.R.id.list);

    rv.setLayoutManager(new LinearLayoutManager(getActivity()));
    rv.addItemDecoration(new DividerItemDecoration(getActivity(),
      DividerItemDecoration.VERTICAL));
    rv.setAdapter(adapter);
  }

  @Override
  public void onDestroy() {
    if (sub!=null && !sub.isDisposed()) {
      sub.dispose();
    }

    super.onDestroy();
  }

  private static class RVArrayAdapter extends RecyclerView.Adapter<RowHolder> {
    private final ArrayList<String> words;
    private final LayoutInflater inflater;

    private RVArrayAdapter(ArrayList<String> words,
                           LayoutInflater inflater) {
      this.words=words;
      this.inflater=inflater;
    }

    @NonNull
    @Override
    public RowHolder onCreateViewHolder(@NonNull ViewGroup parent,
                                        int viewType) {
      View row=inflater.inflate(android.R.layout.simple_list_item_1, parent, false);

      return new RowHolder(row);
    }

    @Override
    public void onBindViewHolder(@NonNull RowHolder holder,
                                 int position) {
      holder.bind(words.get(position));
    }

    @Override
    public int getItemCount() {
      return words.size();
    }

    private void add(String word) {
      words.add(word);
      notifyItemInserted(words.size()-1);
    }
  }

  private static class RowHolder extends RecyclerView.ViewHolder {
    private final TextView title;

    RowHolder(View itemView) {
      super(itemView);
      title=itemView.findViewById(android.R.id.text1);
    }

    public void bind(String text) {
      title.setText(text);
    }
  }
}
(from Rx/RxAndroid/app/src/main/java/com/commonsware/android/rx/RxDemoFragment.java)

Some things are the same as in the original sample app: the static array, the ArrayList model, the ArrayAdapter, the retained fragment, etc.

However, our AsyncTask is gone, replaced by a chunk of RxJava code:

    Observable<String> observable=Observable
      .create(source())
      .subscribeOn(Schedulers.newThread())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnComplete(() -> {
        Toast.makeText(getActivity(), R.string.done, Toast.LENGTH_SHORT)
          .show();
      });

    sub=observable.subscribe(s -> {
      adapter.add(s);
    });
(from Rx/RxAndroid/app/src/main/java/com/commonsware/android/rx/RxDemoFragment.java)

This code adds a number of items to our Observerable call chain, so let’s take them one at a time.

subscribeOn()

This teaches RxJava the thread to use for invoking our ObservableOnSubscribe code, specifically its subscribe() method (or the equivalent with a lambda expression). Since subscribe() could take some time — in our case, 400 milliseconds per string — we really want to run that on a background thread.

The Schedulers utility class in RxJava provides a handful of options for indicating the thread to use. Here, we are using newThread(), which does pretty much what the name suggests: create a new thread and use that for our subscribe() work.

Typically, if we are using RxJava and RxAndroid, we want to subscribe to events on a background thread. That is not always the case, though. There are techniques for using RxJava/RxAndroid to manage things like user input (e.g., text being entered live in an EditText). In those cases, we may have specific threading requirements, such as subscribing on Android’s main application thread.

observeOn()

By default, if you use subscribeOn(), everything involved with this stream will be done on that thread. This would include the work we want to do to update our UI, via our subscribe() call. And, on Android, we cannot update the UI from a background thread.

observeOn() basically says “everything in this Observable configuration from this point forward should switch to using this other thread”. With our subscribe() call coming after observeOn(), we wind up processing our subscribe() work in the thread indicated by observeOn().

Here, we use RxAndroid and its AndroidSchedulers class, which has a mainThread() method to indicate that we want the work to be done on the main application thread.

Note that you can call observeOn() several times. Typically, this is not necessary. But, for example, you might use subscribeOn() to receive some UI event on the main application thread, then call observeOn() to direct initial processing of that event onto a background thread (e.g., database I/O for updating a live filter based on what has been typed in so far), then call observeOn() again to direct further processing to the main application thread (e.g., update a RecyclerView with the now-filtered results).

doOnComplete()

In our samples so far, in the onPostExecute() method of our AsyncTask, we showed a Toast to signal that the work was completed.

The equivalent step in RxJava is doOnComplete(). This takes an Action object or, in this case, a lambda expression. This will be called when our source() triggers onCompleted() on its supplied emitter. Here, we show the Toast.

subscribe()

We then subscribe() to the Observable, feeding the strings into our ArrayAdapter. subscribe() returns a Disposable instance, which we hold onto in a field.

On a configuration change, we retain the Disposable, which in turn holds onto our Observable. We also retain the ArrayAdapter. So, the new activity and new fragment connect back up to the ArrayAdapter, and they get all existing words plus any new ones fed in by our ongoing Rx work, if that has not completed already.

If the user presses BACK to exit the activity, or the activity otherwise is being finally destroyed, onDestroy() is called on our retained fragment, and there we dispose() of that Disposable:

  @Override
  public void onDestroy() {
    if (sub!=null && !sub.isDisposed()) {
      sub.dispose();
    }

    super.onDestroy();
  }
(from Rx/RxAndroid/app/src/main/java/com/commonsware/android/rx/RxDemoFragment.java)

This way, if the Rx background work is still going on, and the fragment is being destroyed, we disconnect and do not try to continue updating the ArrayAdapter or display the Toast, as neither of those things are safe with a destroyed activity.

Lambdas and Lifetimes

Be careful about what you reference from lambda expressions tied into RxJava, or their anonymous inner class counterparts. You need to ensure that the lambdas only reference objects with the same intended lifespan as the lambda itself.

In particular, be careful about referencing widgets directly. Suppose that we had some RxJava work that was delivering a String to be filled into an EditText. From a retained fragment, you might try doing something like this:


Observable<String> observable=Observable
  .create(source())
  .subscribeOn(Schedulers.newThread())
  .observeOn(AndroidSchedulers.mainThread());

sub=observable.subscribe(s -> {
  myAwesomeEditText.setText(s);
});

The problem here is that myAwesomeEditText is a specific EditText instance, the one known about at the time when the lambda expression is created. If we undergo a configuration change, our fragment winds up retaining the lambda expression but creating a new EditText. However, our lambda expression does not know about that new EditText, so it happily sets the text of the old EditText, with unfortunate results.

A safer approach is to call a method:


Observable<String> observable=Observable
  .create(source())
  .subscribeOn(Schedulers.newThread())
  .observeOn(AndroidSchedulers.mainThread());

sub=observable.subscribe(s -> {
  updateMyAwesomeEditText(s);
});

where the method on the fragment can refer to the proper EditText instance, whichever one is current at the time that the lambda expression is evaluated.

Streaming from a Resource

Of course, this still uses a 400-millisecond sleep() call to simulate real work. The next step would be to do some actual I/O here, taking advantage of RxJava’s ability to pull data from the stream on a background thread.

The Rx/XML sample project places our 25 Latin words in an XML resource:

<words>
  <word value="lorem" />
  <word value="ipsum" />
  <word value="dolor" />
  <word value="sit" />
  <word value="amet" />
  <word value="consectetuer" />
  <word value="adipiscing" />
  <word value="elit" />
  <word value="morbi" />
  <word value="vel" />
  <word value="ligula" />
  <word value="vitae" />
  <word value="arcu" />
  <word value="aliquet" />
  <word value="mollis" />
  <word value="etiam" />
  <word value="vel" />
  <word value="erat" />
  <word value="placerat" />
  <word value="ante" />
  <word value="porttitor" />
  <word value="sodales" />
  <word value="pellentesque" />
  <word value="augue" />
  <word value="purus" />
</words>
(from Rx/XML/app/src/main/res/xml/words.xml)

We can then create an ObservableOnSubscribe implementation that reads in those words, using an XmlPullParser obtained from a Resources object, and use those words for our onNext() calls:

  private static class WordSource implements ObservableOnSubscribe<String> {
    private final Resources resources;

    WordSource(Context ctxt) {
      resources=ctxt.getResources();
    }

    @Override
    public void subscribe(ObservableEmitter<String> emitter) {
      try {
        XmlPullParser xpp=resources.getXml(R.xml.words);

        while (xpp.getEventType()!=XmlPullParser.END_DOCUMENT) {
          if (xpp.getEventType()==XmlPullParser.START_TAG) {
            if (xpp.getName().equals("word")) {
              emitter.onNext(xpp.getAttributeValue(0));
            }
          }

          xpp.next();
        }

        emitter.onComplete();
      }
      catch (Exception e) {
        emitter.onError(e);
      }
    }
  }
(from Rx/XML/app/src/main/java/com/commonsware/android/rx/RxDemoFragment.java)

Error Handling

If you look closely at that code snippet, you will see that we are using a third method on the ObservableEmitter: onError(). We call this when something goes wrong in reading in the XML, passing the exception along to the emitter.

That, in turn, can make it to the code in our subscribe() call:

    Observable<String> observable=Observable
      .create(new WordSource(getActivity()))
      .subscribeOn(Schedulers.io())
      .map(s -> (s.toUpperCase()))
      .observeOn(AndroidSchedulers.mainThread())
      .doOnComplete(() ->
        Toast.makeText(getActivity(), R.string.done, Toast.LENGTH_SHORT).show());

    sub=observable.subscribe(s -> adapter.add(s),
        error ->
          Toast
            .makeText(getActivity(), error.getMessage(), Toast.LENGTH_LONG)
            .show());
(from Rx/XML/app/src/main/java/com/commonsware/android/rx/RxDemoFragment.java)

Now, we are using a two-parameter implementation of subscribe() and are passing in two lambda expressions. The first provides us our data, which we pass to the ArrayAdapter as before. The second lambda will be called if an exception is encountered and supplied to our emitter and its onError() method. Here, we get the error, and use its message to show a Toast.

Transmogrification

If you look closely at that code snippet, we have two other changes, compared to the earlier RxAndroid sample.

One is that we are using Schedulers.io(), instead of Schedulers.newThread(). RxJava has a thread dedicated for I/O concerns, which we are using here. That is not a requirement, and for more sophisticated scenarios you might need something else (e.g., a thread from a thread pool that you configure and manage). Here, we are just illustrating more than one way to move our I/O code off the main application thread.

The second change, while small here, is more profound in general. We are calling map(), passing in a lambda expression that takes a String and converts it to uppercase. If you run this sample app, you will see that all of the words show up in uppercase (and quickly, since reading words out of an XML resource happens much faster than one word every 400 milliseconds).

map() is known as an operator. Its job is to take a stream as input and emit another stream as output, executing some code on each item in the stream to change it, somehow.

There are lots of operators built into RxJava, handling all sorts of scenarios, such as filtering:

The takeUntil() method that we have been calling is another operator, saying “take all items from the main Observable until this other Observable says otherwise”.

Other operators aggregate or convert the stream, such as:

RxJava alone has perhaps a hundred operators, and you can create others if none of the built-in ones meet your needs.

Rx-Enabled Libraries

Many libraries either offer an Rx-compatible API as part of their base functionality or offer an Rx-compatible bridge between your code and some existing, non-Rx-compatible API.

For example, Retrofit offers an RxJava 2-compatible adapter for Retrofit 2.x.

The Rx/Retrofit sample project is a clone of the original Retrofit 2.x sample from the chapter on Internet access, revised to use RxJava bindings instead of Retrofit’s built-in asynchronous options, and revised to use a RecyclerView.

The dependencies now include the RxJava 2 adapter for Retrofit, plus RxAndroid — these, in turn, will pull in RxJava:

apply plugin: 'com.android.application'

dependencies {
    implementation 'com.squareup.picasso:picasso:2.5.2'
    implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
    implementation 'com.android.support:recyclerview-v7:27.1.1'
    implementation "com.android.support:support-fragment:27.1.1"
    implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    androidTestImplementation 'com.android.support.test:rules:1.0.1'
    androidTestImplementation 'com.android.support.test.uiautomator:uiautomator-v18:2.1.3'
}

android {
    compileSdkVersion 27
    buildToolsVersion '27.0.3'

    defaultConfig {
        minSdkVersion 18
        targetSdkVersion 27
        versionCode 1
        versionName "1.0"
        applicationId "com.commonsware.android.rx.retrofit"
        testApplicationId "com.commonsware.android.rx.retrofit.test"
        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
    }

    dataBinding {
        enabled = true
    }

    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

(from Rx/Retrofit/app/build.gradle)

Now, though, StackOverflowInterface can return an Observable, rather than a Call:

package com.commonsware.android.databind.basic;

import io.reactivex.Observable;
import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.http.GET;
import retrofit2.http.Path;
import retrofit2.http.Query;

public interface StackOverflowInterface {
  @GET("/2.1/questions?order=desc&sort=creation&site=stackoverflow")
  Observable<SOQuestions> questions(@Query("tagged") String tags);

  @GET("/2.1/questions/{ids}?site=stackoverflow")
  Observable<SOQuestions> update(@Path("ids") String questionIds);
}
(from Rx/Retrofit/app/src/main/java/com/commonsware/android/databind/basic/StackOverflowInterface.java)

This gives us an Observable on a stream, just as if we had called Observable.fromArray() or Observable.create(). In this case, the “stream” is a single-item stream, containing an SOQuestions payload, which is our parsed response from the Stack Exchange API.

That, in turn, allows our QuestionsFragment to use RxJava to arrange to perform the I/O on the io() thread, process the response on the Android main application thread, plus process our questions or any errors that may crop up:

    sub=so.questions("android")
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(soQuestions -> {
          for (Item item : soQuestions.items) {
            Question question=new Question(item);

            questions.add(question);
            questionMap.put(question.id, question);
          }

          adapter.setQuestions(questions);
        }, t -> {
          Toast.makeText(getActivity(), t.getMessage(),
            Toast.LENGTH_LONG).show();
          Log.e(getClass().getSimpleName(),
            "Exception from Retrofit request to StackOverflow", t);
        }
      );
(from Rx/Retrofit/app/src/main/java/com/commonsware/android/databind/basic/QuestionsFragment.java)

Further Reading

The primary source of documentation on RxJava comes in the form of the GitHub repo’s wiki.

JavaDocs of the RxJava 2 API are also available.

What About LiveData?

In 2017, Google introduced the Architecture Components: a set of libraries designed to offer higher-level abstractions around core architecture concerns. One piece of the Architecture Components is LiveData, which aims to offer a simpler approach to the same reactive pattern that is espoused in RxJava/RxAndroid.

LiveData is covered in the companion volume, “Android’s Architecture Components”.