RxJava is a series of operations implemented using the observer pattern, so it is important to understand the observer, the observed, as well as subscription and events in the observer pattern.
Observable: In the observer pattern, it is called the "observed";
Observer: The "observer" in the observer pattern, which can receive data sent by the Observable;
subscribe: Subscription, where the observer subscribes to the observed through the Observable's subscribe() method;
Subscriber: Also a type of observer, in 2.0 it is not substantially different from Observer, except that Subscriber is used in conjunction with Flowable (also an observed), where Observer is used to subscribe to Observable, and Subscriber is used to subscribe to Flowable.
Observer Pattern#
The implementation of rxjava is mainly through the observer pattern.
Object A (the observer) is highly sensitive to certain changes in Object B (the observed) and needs to react at the moment B changes.
In the observer pattern of the program, the observer does not need to constantly monitor the observed, but instead uses a registration or subscription method to inform the observed: I need your certain state, and you should notify me when it changes.
At the same time, we can also have multiple observers corresponding to one observed.
In fact, there are many built-in observer patterns in Android. The most obvious is the click event. For example, after clicking a button, a Toast pops up. So, when we click the button, we inform the system that we need to show a toast. Then it pops up. At this point, the question arises: Do I need to listen to this button in real-time? The answer is no. This is different from the previous example. In other words, I only need to listen when this button is clicked. This operation is called subscription. In other words, the Button subscribes to the OnClickListener through setOnClickListener to listen for the onClick method.
Basic Usage#
The basic implementation of rxjava mainly consists of three points:
- Initialize Observable (the observed)
- Initialize Observer (the observer)
- Establish a subscription relationship between the two
Create Observable#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");
e.onComplete(); // After calling complete, no more events will be accepted
}
});
Create Observer#
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("rxjava", "onSubscribe: " + d);
}
@Override
public void onNext(String string) {
Log.i("rxjava", "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.i("rxjava", "onError: " + e);
}
@Override
public void onComplete() {
Log.i("rxjava", "onComplete: ");
}
};
- onSubscribe: It will be called before the event is sent and can be used for some preparation operations. The Disposable inside is used to break the upstream and downstream relationship.
- onNext: Normal event. Adds the event to be processed to the queue.
- onError: Exception in the event queue. This method will be called when an exception occurs during event processing. The queue will also terminate, meaning no events will be emitted.
- onComplete: Event queue completion. Rxjava not only processes each event individually but also treats them as a queue. When there are no more onNext events emitted, the onComplete method needs to be triggered as a completion indicator.
Create Subscription#
observable.subscribe(observer);
Result#
First, onSubscribe is called, then onNext is executed, and finally it ends with onComplete:
Thread Scheduling#
- subscribeOn() specifies the thread that emits events, while observerOn specifies the thread that the subscriber receives events on.
- Only the first specification of the event-emitting thread is effective when specified multiple times, meaning that only the first call to subscribeOn() is effective, and the rest will be ignored.
- However, it is possible to specify the subscriber receiving thread multiple times, meaning that each time observerOn() is called, the downstream thread will switch once.
Rxjava has built-in some threads for us to choose from:
- Schedulers.io() represents the thread for IO operations, usually used for network, file read/write, and other IO-intensive operations.
- Schedulers.computation() represents CPU computation-intensive operations, such as operations that require a lot of calculations.
- Schedulers.newThread() represents a regular new thread.
- AndroidSchedulers.mainThread() represents the main thread of Android.
Example:#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world " + Thread.currentThread().getName());
e.onComplete(); // After calling complete, no more events will be accepted
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()) // Switch to the main thread
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava", "Main thread " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava", "thread " + Thread.currentThread().getName());
}
});
Result#
2020-03-13 17:06:23.873 8760-8760/com.example.rxjava_test I/rxjava: onSubscribe: io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver@92153c7
2020-03-13 17:06:23.980 8760-8760/com.example.rxjava_test I/rxjava: Main thread main
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: thread RxCachedThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onNext: hello world RxNewThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onComplete:
Combining rxjava and Retrofit#
Create a class to receive server return data#
public class TranslationBean {
private int errorCode; // Error return code
private String query; // Source language
private List<String> translation; // Translation result
private basicEntity basic;
private List<WebEntity> web;
private String tSpeakUrl;
public class basicEntity {
@SerializedName("us-phonetic")
private String usPhonetic; // British phonetic
@SerializedName("uk-speech")
private String ukSpeech; // American pronunciation
@SerializedName("us-speech")
private String usSpeech; // British pronunciation
private String phonetic; // Default phonetic
@SerializedName("uk-phonetic")
private String ukPhonetic; // American phonetic
private List<String> explains; // Basic meanings
public List<String> getExplains() {
return explains;
}
public void setExplains(List<String> explains) {
this.explains = explains;
}
public String getPhonetic() {
return phonetic;
}
public void setPhonetic(String phonetic) {
this.phonetic = phonetic;
}
public String getUkPhonetic() {
return ukPhonetic;
}
public void setUkPhonetic(String ukPhonetic) {
this.ukPhonetic = ukPhonetic;
}
public String getUsPhonetic() {
return usPhonetic;
}
public void setUsPhonetic(String usPhonetic) {
this.usPhonetic = usPhonetic;
}
public String getUkSpeech() {
return ukSpeech;
}
public void setUkSpeech(String ukSpeech) {
this.ukSpeech = ukSpeech;
}
public String getUsSpeech() {
return usSpeech;
}
public void setUsSpeech(String usSpeech) {
this.usSpeech = usSpeech;
}
}
public class WebEntity {
private String key;
private List<String> value;
public void setKey(String key) {
this.key = key;
}
public void setValue(List<String> value) {
this.value = value;
}
public String getKey() {
return key;
}
public List<String> getValue() {
return value;
}
}
public void setQuery(String query) {
this.query = query;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public void setTranslation(List<String> translation) {
this.translation = translation;
}
public void setWeb(List<WebEntity> web) {
this.web = web;
}
public String getQuery() {
return query;
}
public int getErrorCode() {
return errorCode;
}
public List<String> getTranslation() {
return translation;
}
public List<WebEntity> getWeb() {
return web;
}
public basicEntity getBasic() {
return basic;
}
public void setBasic(basicEntity basic) {
this.basic = basic;
}
public void settSpeakUrl(String tSpeakUrl){ this.tSpeakUrl = tSpeakUrl;}
public String gettSpeakUrl(){ return tSpeakUrl; }
}
Create an interface to describe network requests#
public interface networkApi {
@GET("api?")
Observable<TranslationBean> translateYouDao(
@Query("q") String q,
@Query("from") String from,
@Query("to") String to,
@Query("appKey") String appKey, // Application ID
@Query("salt") String salt, // UUID
@Query("sign") String sign, // Application ID + input + salt + curtime + application secret. input = first 10 characters of q + length of q + last 10 characters of q (length of q >= 20) or input = string
@Query("signType") String signType, // Signature type
@Query("curtime") String curtime // Timestamp
);
}
Create Retrofit#
public class netWork {
private static networkApi sContactsApi;
private static OkHttpClient okHttpClient = new OkHttpClient();
private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();
private static class ApiClientHolder {
public static final netWork INSTANCE = new netWork();
}
public static netWork getInstance() {
return ApiClientHolder.INSTANCE;
}
public networkApi getDataService() {
if (sContactsApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(okHttpClient)
.baseUrl(Constants.BASE_URL)
.addConverterFactory(gsonConverterFactory)
.addCallAdapterFactory(rxJavaCallAdapterFactory)
.build();
sContactsApi = retrofit.create(networkApi.class);
}
return sContactsApi;
}
}
Usage#
@SuppressLint("CheckResult")
public void netConnection(String q, String from, String to, String salt, String sign, String curtime) {
netWork.getInstance().getDataService()
.translateYouDao(q, from, to, appID, salt, sign, signType, curtime)
.subscribeOn(Schedulers.newThread()) // Execution initiated in a new thread
.observeOn(AndroidSchedulers.mainThread()) // Result execution in the main thread
.subscribe(new Consumer<TranslationBean>() {
@Override
public void accept(TranslationBean translationBean) throws Exception {
// Process the received data class
}
});
}