A programming model that has been gaining substantial ground in recent years is reactive programming. In Android, much of the attention has been on RxJava, the “reactive extensions for Java”. Many libraries offer the ability to be consumed using RxJava, and many Android experts have latched onto RxJava as a way to reduce certain types of complexity in Android app development.
In this chapter, we will review what reactive programming is, what RxJava is, and how you can apply RxJava in your Android app.
However, please understand that reactive programming is a very large topic. Just a complete explanation of RxJava would entail its own book. This chapter should be seen as a launching pad for further explorations of your own, more so than a definitive reference on the subject.
Understanding this chapter requires that you have read the core chapters of this book.
In order to understand reactive programming, we first need to think about streams.
When a Java programmer hears the term “stream”, what often pops into
mind is InputStream
and OutputStream
. Those offer access to a stream
of bytes, for input and output, respectively. Here, “stream” means that
the bytes are available one at a time (though are often retrieved in
a clump, such as an 8192-byte buffer), and that once removed from the
stream the bytes are considered to be “consumed” and are no longer available
from the stream itself.
When Java programmers think of InputStream
and OutputStream
, what
often pops into mind is FileInputStream
and FileOutputStream
. With
FileInputStream
, the source of the bytes is fixed: the contents of a
designated file. With FileOutputStream
, the destination of the bytes
is fixed: once again, a designated file.
However, there are many other sources of InputStream
and OutputStream
.
Some that you encounter in the book are:
InputStream
that you get from
HttpUrlConnection
ContentProvider
, such as the InputStream
that you get from calling openInputStream()
on a ContentProvider
Particularly in the HTTP case, the source of the bytes is “live”, insofar as there does not have to be some specific file that is the source of those bytes. Those bytes could represent a generated Web page, or a live audio stream, or anything else.
Hence, more generally, a stream represents a flow of data, where we can
pull data off of the stream and do something with it. That “flow of
data” could be bytes from a file, as we see with InputStream
. But lots
of other things could be modeled as flows of data. Pretty much anything
where the data would come to us asynchronously could be modeled this way,
such as:
You could even model some things that might not feel like a “stream” as
a stream if you wanted to. For example, querying a database or ContentProvider
gives you a Cursor
back, and you could model that as being a stream
of rows.
Reactive programming works with streams. Rather than “iterate over the
rows in the Cursor
and do X to each”, you say “as a row comes in from
the stream of rows, do X to each”. Here, you are “reacting” to the availability
of a row.
But part of reactive programming is that what processes a stream might itself turn around and be a stream of events to something else. For example, given a stream of audio signals, you might implement a filter that clamps high-volume signals to a lower volume. That filter takes in a stream (raw signals) and emits another stream (clamped signals). Other code could react to the filtered stream (e.g., to record bytes to disk).
You may have already done reactive programming, without even realizing it. The world’s most popular reactive programming environment is not some functional programming language or UI framework.
It’s a spreadsheet.
Spreadsheet cells — particularly for simpler sheets — can be thought of as either having data (particularly numbers) or formulas. A formula references cells and performs a calculation upon them. This includes performing calculations upon other cells that contain formulas.
For example, you might have a spreadsheet like this:
Figure 359: Simple Spreadsheet
Here, the two formulas are shown in bold, while the data cells are in the default font.
The average formula cell has =AVERAGE(B1:B5)
, to compute the average
of the five number cells above it. The result formula cell has =B6*B7
,
to multiply the average by the “factor” number.
If you change that factor, not only does that cell change, but so does the result formula cell. The B6 and B7 values referenced in the result formula do not just identify cells, but represent streams of changes to those cells’ values. When B7 (the factor value) changes, the result formula reacts to the changed-value event and recalculates its formula, showing the result:
Figure 360: Simple Spreadsheet with New Factor Value
Similarly, if you change one of the five initial values (1, 1, 2, 3, 5), the average formula cell responds to the changed-value event and recalculates its value. That, in turn, triggers a changed-value event that causes the result formula cell to react and recalculate its value:
Figure 361: Simple Spreadsheet with New Initial and New Factor Value
At this point, you may be wondering what this has to do with Android, or even Java.
The pre-eminent framework for reactive programming in Java is RxJava. RxJava is a library that helps you model data as streams and apply a chain of operations to those streams.
RxJava is part of the ReactiveX series of libraries, offering reactive programming for a wide range of programming languages and platforms.
In late 2016, RxJava 2 was released. This includes some refactoring of the original RxJava classes to comply with the Reactive Streams specification. This chapter focuses on RxJava 2. Some material that you read on RxJava might be using the original RxJava API. While pretty much everything done with classic RxJava can be done with RxJava2, some conversion may be required.
(BTW, for those of you wondering about the title of this section, Rx is an abbreviation for the word “prescription”)
RxJava relies heavily on functions. Java is not a functional programming language, and so it does not have “functions” as standalone first-class units of programming.
Java 8 offers lambda expressions, which are akin to
anonymous functions. RxJava supports Java 8 lambda expressions,
and by using Android Studio 3.0+ (and its related build tools),
Android developers can use lambda expressions in
projects with a minSdkVersion
of 9 or higher.
The examples in this chapter use lambda expressions, as that will be the typical way that you see RxJava be used.
Back in the chapter on threads, we had
the Threads/AsyncRV
sample app, which
demonstrated using an AsyncTask
. There, our data source was a static
Java array of strings, and we would not need an AsyncTask
to “load”
those into a list. However, to keep things simple, we pretended that
it took real work to obtain those strings, and we used an AsyncTask
to do that work in the background.
Let’s explore reactive programming and RxJava by revamping that example,
starting with the
Rx/Simple
sample project.
RxJava is a library, available from Maven Central and JCenter. So,
a typical Android Studio project could add RxJava as a dependency
in a module’s build.gradle
file:
apply plugin: 'com.android.application'
dependencies {
implementation "com.android.support:recyclerview-v7:27.1.1"
implementation "com.android.support:support-fragment:27.1.1"
implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
}
android {
compileSdkVersion 27
buildToolsVersion '27.0.3'
defaultConfig {
minSdkVersion 15
targetSdkVersion 27
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
The original AsyncTask
sample app has a doInBackground()
method that
looks a bit like this:
@Override
protected Void doInBackground(Void... unused) {
for (String item : ITEMS) {
if (isCancelled())
break;
publishProgress(item);
SystemClock.sleep(400);
}
return(null);
}
Here, ITEMS
is our static
String array. We iterate over that array,
and if our task has not been canceled, we call publishProgress()
to add
a string to our ListView
, and sleep for 400 milliseconds as our way
of pretending to do real work.
The first step towards switching to a reactive model is to replace that loop with a stream:
@Override
protected Void doInBackground(Void... unused) {
Observable.fromArray(ITEMS)
.subscribe(s -> {
if (!isCancelled()) {
publishProgress(s);
SystemClock.sleep(400);
}
});
return(null);
}
io.reactivex.Observable
is the root class for most of what you will
do with RxJava. It helps you set up a stream and then react to that
stream.
There are several ways to set up a stream — we will see a few of them
in this chapter. In this case, we are using fromArray()
, which sets
up a stream based on the items in that array. In reality, this is a
factory method; the returned object is an Observable
.
We then call subscribe()
on the Observable
. This basically represents
the sink for the stream of data coming from the Observable
. In this
case, we use lambda expression implementation of the Consumer
interface. Our lambda expression body implements the accept()
method
from that Consumer
interface, and it will be invoked for each piece of data
from the stream. In this case, ITEMS
is an array of String
, and
so we are setting up a Consumer
of String
and therefore we accept()
a String
. Our lambda expression has our isCancelled()
check,
our publishProgress()
call, and our sleep()
call,
as with the iterative loop from the original sample.
Our 400 milliseconds of sleep()
is a way of simulating doing work.
However, as RxJava is showing us, we are not actually simulating doing
work to get the data. Instead, we are simulating doing work to consume
the data. Our sleep()
call is in our Consumer
(or the equivalent
lambda expression), not in our data source.
However, we can model this better, by creating our own data source that
is a bit more sophisticated than a simple static
Java array.
The
Rx/Observable
sample project replaces fromArray()
with a call to create()
, which
creates an Observable
upon a supplied source of data:
@Override
protected Void doInBackground(Void... unused) {
Observable.create(source())
.subscribe(s -> {
if (!isCancelled()) {
publishProgress(s);
}
});
return(null);
}
private ObservableOnSubscribe<String> source() {
return(emitter -> {
for (String item : ITEMS) {
if (!isCancelled()) {
emitter.onNext(item);
SystemClock.sleep(400);
}
}
emitter.onComplete();
});
}
Here, source()
is returning a lambda expression that Java maps to
an instance of an oddly-named ObservableOnSubscribe
interface. An ObservableOnSubscribe
is for cases where we are only
actually starting to obtain data for our stream when something subscribes
to the Observable
, not before. In our case, that is fine, as we are still
just using our String
array.
An ObservableOnSubscribe
implements a subscribe()
method. This is
called when a subscriber subscribes to the Observable
. The job of
subscribe()
is to work with the underlying data and call three
methods on the supplied ObservableEmitter
:
onNext()
for each piece of data in the streamonComplete()
when we are done with all possible dataonError()
if something fails (e.g., an IOException
when reading
from a socket)In our case, we are iterating over our Java array, calling onNext()
for each String
, and sleeping the 400 milliseconds to simulate doing
actual work.
At this point, you may be wondering why we are bothering with any
of this, as our code gets more and more complex without any particular
added value. This reaction is understandable. All we have done is make
an AsyncTask
more complicated.
So, let’s get rid of the AsyncTask
.
Part of the power of RxJava is thread management. You can tell it the threads to use for working with streams, such as doing I/O on a background thread. However, RxJava is a pure Java library. It knows nothing about Android and Android-specific constructs like the main application thread.
Fortunately, there is RxAndroid. This a small library adding some Android-specific smarts to RxJava, such as the notion of the main application thread. This, like RxJava, is available as an artifact:
apply plugin: 'com.android.application'
dependencies {
implementation "com.android.support:recyclerview-v7:27.1.1"
implementation "com.android.support:support-fragment:27.1.1"
implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
android {
compileSdkVersion 27
buildToolsVersion '27.0.3'
defaultConfig {
minSdkVersion 15
targetSdkVersion 27
applicationId "com.commonsware.android.rx.rxandroid"
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
With that in mind, let’s look at
the
Rx/RxAndroid
sample project and our substantially-revised ListFragment
, here
named RxDemoFragment
:
package com.commonsware.android.rx;
import android.os.Bundle;
import android.os.SystemClock;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.v4.app.Fragment;
import android.support.v7.widget.DividerItemDecoration;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;
import android.widget.Toast;
import java.util.ArrayList;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class RxDemoFragment extends Fragment {
private static final String[] ITEMS= { "lorem", "ipsum", "dolor",
"sit", "amet", "consectetuer", "adipiscing", "elit", "morbi",
"vel", "ligula", "vitae", "arcu", "aliquet", "mollis", "etiam",
"vel", "erat", "placerat", "ante", "porttitor", "sodales",
"pellentesque", "augue", "purus" };
private ArrayList<String> model=new ArrayList<>();
private RVArrayAdapter adapter;
private Disposable sub=null;
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setRetainInstance(true);
adapter=new RVArrayAdapter(model, getLayoutInflater());
Observable<String> observable=Observable
.create(source())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
Toast.makeText(getActivity(), R.string.done, Toast.LENGTH_SHORT)
.show();
});
sub=observable.subscribe(s -> {
adapter.add(s);
});
}
private ObservableOnSubscribe<String> source() {
return(emitter -> {
for (String item : ITEMS) {
emitter.onNext(item);
SystemClock.sleep(400);
}
emitter.onComplete();
});
}
@Nullable
@Override
public View onCreateView(@NonNull LayoutInflater inflater,
@Nullable ViewGroup container,
@Nullable Bundle savedInstanceState) {
return inflater.inflate(R.layout.main, container, false);
}
@Override
public void onViewCreated(View v, Bundle savedInstanceState) {
super.onViewCreated(v, savedInstanceState);
RecyclerView rv=v.findViewById(android.R.id.list);
rv.setLayoutManager(new LinearLayoutManager(getActivity()));
rv.addItemDecoration(new DividerItemDecoration(getActivity(),
DividerItemDecoration.VERTICAL));
rv.setAdapter(adapter);
}
@Override
public void onDestroy() {
if (sub!=null && !sub.isDisposed()) {
sub.dispose();
}
super.onDestroy();
}
private static class RVArrayAdapter extends RecyclerView.Adapter<RowHolder> {
private final ArrayList<String> words;
private final LayoutInflater inflater;
private RVArrayAdapter(ArrayList<String> words,
LayoutInflater inflater) {
this.words=words;
this.inflater=inflater;
}
@NonNull
@Override
public RowHolder onCreateViewHolder(@NonNull ViewGroup parent,
int viewType) {
View row=inflater.inflate(android.R.layout.simple_list_item_1, parent, false);
return new RowHolder(row);
}
@Override
public void onBindViewHolder(@NonNull RowHolder holder,
int position) {
holder.bind(words.get(position));
}
@Override
public int getItemCount() {
return words.size();
}
private void add(String word) {
words.add(word);
notifyItemInserted(words.size()-1);
}
}
private static class RowHolder extends RecyclerView.ViewHolder {
private final TextView title;
RowHolder(View itemView) {
super(itemView);
title=itemView.findViewById(android.R.id.text1);
}
public void bind(String text) {
title.setText(text);
}
}
}
Some things are the same as in the original sample app: the static
array, the ArrayList
model, the ArrayAdapter
, the retained fragment, etc.
However, our AsyncTask
is gone, replaced by a chunk of RxJava code:
Observable<String> observable=Observable
.create(source())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
Toast.makeText(getActivity(), R.string.done, Toast.LENGTH_SHORT)
.show();
});
sub=observable.subscribe(s -> {
adapter.add(s);
});
This code adds a number of items to our Observerable
call chain, so
let’s take them one at a time.
This teaches RxJava the thread to use for invoking our ObservableOnSubscribe
code, specifically its subscribe()
method (or the equivalent with a
lambda expression). Since subscribe()
could take some time — in our
case, 400 milliseconds per string — we really want to run that on a
background thread.
The Schedulers
utility class in RxJava provides a handful of options
for indicating the thread to use. Here, we are using newThread()
, which
does pretty much what the name suggests: create a new thread and use that
for our subscribe()
work.
Typically, if we are using RxJava and RxAndroid, we want to subscribe to
events on a background thread. That is not always the case, though. There
are techniques for using RxJava/RxAndroid to manage things like user input
(e.g., text being entered live in an EditText
). In those cases, we may have
specific threading requirements, such as subscribing on Android’s main application
thread.
By default, if you use subscribeOn()
, everything involved with this
stream will be done on that thread. This would include the work we
want to do to update our UI, via our subscribe()
call. And, on Android,
we cannot update the UI from a background thread.
observeOn()
basically says “everything in this Observable
configuration
from this point forward should switch to using this other thread”. With
our subscribe()
call coming after observeOn()
, we wind up processing
our subscribe()
work in the thread indicated by observeOn()
.
Here, we use RxAndroid and its AndroidSchedulers
class, which has a
mainThread()
method to indicate that we want the work to be done
on the main application thread.
Note that you can call observeOn()
several times. Typically, this is not
necessary. But, for example, you might use subscribeOn()
to receive some
UI event on the main application thread, then call observeOn()
to direct
initial processing of that event onto a background thread (e.g., database I/O
for updating a live filter based on what has been typed in so far), then
call observeOn()
again to direct further processing to the main application
thread (e.g., update a RecyclerView
with the now-filtered results).
In our samples so far, in the onPostExecute()
method of our AsyncTask
,
we showed a Toast
to signal that the work was completed.
The equivalent step in RxJava is doOnComplete()
. This takes an Action
object or, in this case, a lambda expression. This will be called when
our source()
triggers onCompleted()
on its supplied emitter
.
Here, we show the Toast
.
We then subscribe()
to the Observable
, feeding the strings into our
ArrayAdapter
. subscribe()
returns a Disposable
instance, which we hold
onto in a field.
On a configuration change, we retain the Disposable
, which in turn holds onto
our Observable
. We also retain the ArrayAdapter
. So, the new activity and
new fragment connect back up to the ArrayAdapter
, and they get all existing
words plus any new ones fed in by our ongoing Rx work, if that has not completed
already.
If the user presses BACK to exit the activity, or the activity otherwise is
being finally destroyed, onDestroy()
is called on our retained fragment,
and there we dispose()
of that Disposable
:
@Override
public void onDestroy() {
if (sub!=null && !sub.isDisposed()) {
sub.dispose();
}
super.onDestroy();
}
This way, if the Rx background work is still going on, and the fragment is
being destroyed, we disconnect and do not try to continue updating the
ArrayAdapter
or display the Toast
, as neither of those things are safe with
a destroyed activity.
Be careful about what you reference from lambda expressions tied into RxJava, or their anonymous inner class counterparts. You need to ensure that the lambdas only reference objects with the same intended lifespan as the lambda itself.
In particular, be careful about referencing widgets directly. Suppose that we
had some RxJava work that was delivering a String
to be filled into an EditText
.
From a retained fragment, you might try doing something like this:
Observable<String> observable=Observable
.create(source())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
sub=observable.subscribe(s -> {
myAwesomeEditText.setText(s);
});
The problem here is that myAwesomeEditText
is a specific EditText
instance,
the one known about at the time when the lambda expression is created. If we
undergo a configuration change, our fragment winds up retaining the lambda expression
but creating a new EditText
. However, our lambda expression does not know
about that new EditText
, so it happily sets the text of the old EditText
,
with unfortunate results.
A safer approach is to call a method:
Observable<String> observable=Observable
.create(source())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
sub=observable.subscribe(s -> {
updateMyAwesomeEditText(s);
});
where the method on the fragment can refer to the proper EditText
instance,
whichever one is current at the time that the lambda expression is evaluated.
Of course, this still uses a 400-millisecond sleep()
call to simulate
real work. The next step would be to do some actual I/O here, taking
advantage of RxJava’s ability to pull data from the stream on a background
thread.
The
Rx/XML
sample project places our 25 Latin words in an XML resource:
<words>
<word value="lorem" />
<word value="ipsum" />
<word value="dolor" />
<word value="sit" />
<word value="amet" />
<word value="consectetuer" />
<word value="adipiscing" />
<word value="elit" />
<word value="morbi" />
<word value="vel" />
<word value="ligula" />
<word value="vitae" />
<word value="arcu" />
<word value="aliquet" />
<word value="mollis" />
<word value="etiam" />
<word value="vel" />
<word value="erat" />
<word value="placerat" />
<word value="ante" />
<word value="porttitor" />
<word value="sodales" />
<word value="pellentesque" />
<word value="augue" />
<word value="purus" />
</words>
We can then create an ObservableOnSubscribe
implementation that
reads in those words, using an XmlPullParser
obtained from a Resources
object, and use those words for our onNext()
calls:
private static class WordSource implements ObservableOnSubscribe<String> {
private final Resources resources;
WordSource(Context ctxt) {
resources=ctxt.getResources();
}
@Override
public void subscribe(ObservableEmitter<String> emitter) {
try {
XmlPullParser xpp=resources.getXml(R.xml.words);
while (xpp.getEventType()!=XmlPullParser.END_DOCUMENT) {
if (xpp.getEventType()==XmlPullParser.START_TAG) {
if (xpp.getName().equals("word")) {
emitter.onNext(xpp.getAttributeValue(0));
}
}
xpp.next();
}
emitter.onComplete();
}
catch (Exception e) {
emitter.onError(e);
}
}
}
If you look closely at that code snippet, you will see that we are using
a third method on the ObservableEmitter
: onError()
. We call this when
something goes wrong in reading in the XML, passing the exception along
to the emitter.
That, in turn, can make it to the code in our subscribe()
call:
Observable<String> observable=Observable
.create(new WordSource(getActivity()))
.subscribeOn(Schedulers.io())
.map(s -> (s.toUpperCase()))
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() ->
Toast.makeText(getActivity(), R.string.done, Toast.LENGTH_SHORT).show());
sub=observable.subscribe(s -> adapter.add(s),
error ->
Toast
.makeText(getActivity(), error.getMessage(), Toast.LENGTH_LONG)
.show());
Now, we are using a two-parameter implementation of subscribe()
and are
passing in two lambda expressions. The first provides us our data,
which we pass to the ArrayAdapter
as before. The second lambda will
be called if an exception is encountered and supplied to our emitter
and its onError()
method. Here, we get the error, and use its message
to show a Toast
.
If you look closely at that code snippet, we have two other changes, compared to the earlier RxAndroid sample.
One is that we are using Schedulers.io()
, instead of Schedulers.newThread()
.
RxJava has a thread dedicated for I/O concerns, which we are using here.
That is not a requirement, and for more sophisticated scenarios you might
need something else (e.g., a thread from a thread pool that you configure
and manage). Here, we are just illustrating more than one way to move
our I/O code off the main application thread.
The second change, while small here, is more profound in general. We
are calling map()
, passing in a lambda expression that takes a
String
and converts it to uppercase. If you run this sample app,
you will see that all of the words show up in uppercase (and quickly,
since reading words out of an XML resource happens much faster than
one word every 400 milliseconds).
map()
is known as an operator. Its job is to take a stream as input
and emit another stream as output, executing some code on each item
in the stream to change it, somehow.
There are lots of operators built into RxJava, handling all sorts of scenarios, such as filtering:
skip()
ignores a specified number of items, not passing them
downstreamtake()
accepts a specified number of items, ignoring the restdistinct()
skips any items that appeared before, using equals()
by defaultThe takeUntil()
method that we have been calling is another operator,
saying “take all items from the main Observable
until this other
Observable
says otherwise”.
Other operators aggregate or convert the stream, such as:
concat()
takes two Observables
and emits the items from the first,
followed by the items from the secondcount()
counts the items and emits a single-item stream containing
the count of the original stream’s itemsreduce()
applies some supplied lambda expression (or the equivalent)
to calculate some result (e.g., an average), emitting a single-item stream
containing that resultRxJava alone has perhaps a hundred operators, and you can create others if none of the built-in ones meet your needs.
Many libraries either offer an Rx-compatible API as part of their base functionality or offer an Rx-compatible bridge between your code and some existing, non-Rx-compatible API.
For example, Retrofit offers an RxJava 2-compatible adapter for Retrofit 2.x.
The
Rx/Retrofit
sample project is a clone of the original Retrofit 2.x sample from
the chapter on Internet access, revised to use
RxJava bindings instead of Retrofit’s built-in asynchronous options, and revised
to use a RecyclerView
.
The dependencies now include the RxJava 2 adapter for Retrofit, plus RxAndroid — these, in turn, will pull in RxJava:
apply plugin: 'com.android.application'
dependencies {
implementation 'com.squareup.picasso:picasso:2.5.2'
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
implementation 'com.android.support:recyclerview-v7:27.1.1'
implementation "com.android.support:support-fragment:27.1.1"
implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
androidTestImplementation 'com.android.support.test:rules:1.0.1'
androidTestImplementation 'com.android.support.test.uiautomator:uiautomator-v18:2.1.3'
}
android {
compileSdkVersion 27
buildToolsVersion '27.0.3'
defaultConfig {
minSdkVersion 18
targetSdkVersion 27
versionCode 1
versionName "1.0"
applicationId "com.commonsware.android.rx.retrofit"
testApplicationId "com.commonsware.android.rx.retrofit.test"
testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
}
dataBinding {
enabled = true
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
Now, though, StackOverflowInterface
can return an Observable
,
rather than a Call
:
package com.commonsware.android.databind.basic;
import io.reactivex.Observable;
import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.http.GET;
import retrofit2.http.Path;
import retrofit2.http.Query;
public interface StackOverflowInterface {
@GET("/2.1/questions?order=desc&sort=creation&site=stackoverflow")
Observable<SOQuestions> questions(@Query("tagged") String tags);
@GET("/2.1/questions/{ids}?site=stackoverflow")
Observable<SOQuestions> update(@Path("ids") String questionIds);
}
This gives us an Observable
on a stream, just as if we had called Observable.fromArray()
or Observable.create()
. In this case, the “stream” is a single-item
stream, containing an SOQuestions
payload, which is our parsed response
from the Stack Exchange API.
That, in turn, allows our QuestionsFragment
to use RxJava to arrange
to perform the I/O on the io()
thread, process the response on
the Android main application thread, plus process our questions or any
errors that may crop up:
sub=so.questions("android")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(soQuestions -> {
for (Item item : soQuestions.items) {
Question question=new Question(item);
questions.add(question);
questionMap.put(question.id, question);
}
adapter.setQuestions(questions);
}, t -> {
Toast.makeText(getActivity(), t.getMessage(),
Toast.LENGTH_LONG).show();
Log.e(getClass().getSimpleName(),
"Exception from Retrofit request to StackOverflow", t);
}
);
The primary source of documentation on RxJava comes in the form of the GitHub repo’s wiki.
JavaDocs of the RxJava 2 API are also available.
In 2017, Google introduced the Architecture Components: a set of libraries
designed to offer higher-level abstractions around core architecture concerns.
One piece of the Architecture Components is LiveData
, which aims to offer
a simpler approach to the same reactive pattern that is espoused in RxJava/RxAndroid.
LiveData
is covered in the companion volume,
“Android’s Architecture Components”.