RxJava 是利用觀察者模式來實現一些列的操作,所以對於觀察者模式中的觀察者,被觀察者,以及訂閱、事件需要有一個了解。
Observable:在觀察者模式中稱為 “被觀察者”;
Observer:觀察者模式中的 “觀察者”,可接收 Observable 發送的數據;
subscribe:訂閱,觀察者與被觀察者,通過 Observable 的 subscribe () 方法進行訂閱;
Subscriber:也是一種觀察者,在 2.0 中 它與 Observer 沒什麼實質的區別,不同的是 Subscriber 要與 Flowable (也是一種被觀察者) 聯合使用,Obsesrver 用於訂閱 Observable,而 Subscriber 用於訂閱 Flowable.
觀察者模式#
rxjava 的實現主要是通過觀察者模式實現的。
A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。
在程序的觀察者模式,觀察者不需要時刻盯著被觀察者,而是採用註冊或者稱為訂閱的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。
同時我們也可以多個觀察者對應一個被觀察者。
其實在 android 中也有很多自帶的觀察者模式。最明顯的莫過於點擊事件。說個最簡單的例子,點擊按鈕後彈一個 Toast。那么,我們在點擊按鈕的時候,告知系統,此時,我需要彈一個吐司。那么就這麼彈出來了。那么,這個時候問題來了。我是否需要實時去監聽這個按鈕呢?答案是不需要的。這就和前面的舉例有的差距了。換句話說。我只要在此按鈕進行點擊時進行監聽就可以了。這種操作被稱為訂閱。也就是說 Button 通過 setOnClickListener 對 OnclickListener 進行了訂閱了操作,來監聽 onclick 方法。
基本使用#
rxjava 的基本實現主要是三點:
- 初始化 Observable (被觀察者)
- 初始化 Observe(觀察者)
- 建立兩者之間的訂閱關係
創建 Observable#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");
e.onComplete(); //調用complete後下面將不再接受事件
}
});
創建 Observe#
Observer<String>observer=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("rxjava", "onSubscribe: " + d);
}
@Override
public void onNext(String string) {
Log.i("rxjava", "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.i("rxjava", "onError: " + e);
}
@Override
public void onComplete() {
Log.i("rxjava", "onComplete: ");
}
};
- onSubscribe:它會在事件還未發送之前被調用,可以用來做一些準備操作。而裡面的 Disposable 則是用來切斷上下游的關係的。
- onNext:普通的事件。將要處理的事件添加到隊列中。
- onError:事件隊列異常,在事件處理過程中出現異常情況時,此方法會被調用。同時隊列將會終止,也就是不允許在有事件發出。
- onComplete:事件隊列完成。rxjava 不僅把每個事件單獨處理。而且會把他們當成一個隊列。當不再有 onNext 事件發出時,需要觸發 onComplete 方法作為完成標識。
創建訂閱#
observable.subscribe(observer);
結果#
先調用 onSubscribe,然後走了 onNext,最後以 onComplete 收尾:
執行緒的調度#
- subscribeOn () 指定的是發射事件的執行緒,observerOn 指定的就是訂閱者接收事件的執行緒。
- 多次指定發射事件的執行緒只有第一次指定的有效,也就是說多次調用 subscribeOn () 只有第一次的有效,其余的會被忽略。
- 但多次指定訂閱者接收執行緒是可以的,也就是說每調用一次 observerOn (),下游的執行緒就會切換一次。
rxjava 中已經內置了一些執行緒供我們選擇:
- Schedulers.io () 代表 io 操作的執行緒,通常用於網絡,讀寫文件等 io 密集型的操作
- Schedulers.computation () 代表 CPU 計算密集型的操作,例如需要大量計算的操作
- Schedulers.newThread () 代表一個常規的新執行緒
- AndroidSchedulers.mainThread () 代表 Android 的主執行緒
例子:#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world "+ Thread.currentThread().getName());
e.onComplete(); //調用complete後下面將不再接受事件
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()) //切換到主執行緒
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava","主執行緒 "+Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava","執行緒 "+Thread.currentThread().getName());
}
});
結果#
2020-03-13 17:06:23.873 8760-8760/com.example.rxjava_test I/rxjava: onSubscribe: io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver@92153c7
2020-03-13 17:06:23.980 8760-8760/com.example.rxjava_test I/rxjava: 主執行緒 main
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: 執行緒 RxCachedThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onNext: hello world RxNewThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onComplete:
結合 rxjava 和 retorfit#
創建接收伺服器返回數據的類#
public class TranslationBean {
private int errorCode; //錯誤返回碼
private String query; //源語言
private List<String> translation; //翻譯結果
private basicEntity basic;
private List<WebEntity> web;
private String tSpeakUrl;
public class basicEntity {
@SerializedName("us-phonetic")
private String usPhonetic; //英式音標
@SerializedName("uk-speech")
private String ukSpeech; //美式發音
@SerializedName("us-speech")
private String usSpeech; //英式發音
private String phonetic; //默認音標
@SerializedName("uk-phonetic")
private String ukPhonetic; //美式英標
private List<String> explains; //基本釋義
public List<String> getExplains() {
return explains;
}
public void setExplains(List<String> explains) {
this.explains = explains;
}
public String getPhonetic() {
return phonetic;
}
public void setPhonetic(String phonetic) {
this.phonetic = phonetic;
}
public String getUkPhonetic() {
return ukPhonetic;
}
public void setUkPhonetic(String ukPhonetic) {
this.ukPhonetic = ukPhonetic;
}
public String getUsPhonetic() {
return usPhonetic;
}
public void setUsPhonetic(String usPhonetic) {
this.usPhonetic = usPhonetic;
}
public String getUkSpeech() {
return ukSpeech;
}
public void setUkSpeech(String ukSpeech) {
this.ukSpeech = ukSpeech;
}
public String getUsSpeech() {
return usSpeech;
}
public void setUsSpeech(String usSpeech) {
this.usSpeech = usSpeech;
}
}
public class WebEntity {
private String key;
private List<String> value;
public void setKey(String key) {
this.key = key;
}
public void setValue(List<String> value) {
this.value = value;
}
public String getKey() {
return key;
}
public List<String> getValue() {
return value;
}
}
public void setQuery(String query) {
this.query = query;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public void setTranslation(List<String> translation) {
this.translation = translation;
}
public void setWeb(List<WebEntity> web) {
this.web = web;
}
public String getQuery() {
return query;
}
public int getErrorCode() {
return errorCode;
}
public List<String> getTranslation() {
return translation;
}
public List<WebEntity> getWeb() {
return web;
}
public basicEntity getBasic() {
return basic;
}
public void setBasic(basicEntity basic) {
this.basic = basic;
}
public void settSpeakUrl(String tSpeakUrl){ this.tSpeakUrl = tSpeakUrl;}
public String gettSpeakUrl(){ return tSpeakUrl; }
}
創建用於描述網絡請求的接口#
public interface networkApi {
@GET("api?")
Observable<TranslationBean> translateYouDao(
@Query("q") String q,
@Query("from") String from,
@Query("to") String to,
@Query("appKey") String appKey, //應用ID
@Query("salt") String salt, //UUID
@Query("sign") String sign, //應用ID+input+salt+curtime+應用密鑰 。 input= q前10個字符+q長度+q後10個字符(q的長度>=20) 或input = 字符串
@Query("signType") String signType, //簽名類型
@Query("curtime") String curtime //時間戳
);
}
創建 Retorfit#
public class netWork {
private static networkApi sContactsApi;
private static OkHttpClient okHttpClient = new OkHttpClient();
private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();
private static class ApiClientHolder {
public static final netWork INSTANCE = new netWork();
}
public static netWork getInstance() {
return ApiClientHolder.INSTANCE;
}
public networkApi getDataService() {
if (sContactsApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(okHttpClient)
.baseUrl(Constants.BASE_URL)
.addConverterFactory(gsonConverterFactory)
.addCallAdapterFactory(rxJavaCallAdapterFactory)
.build();
sContactsApi = retrofit.create(networkApi.class);
}
return sContactsApi;
}
}
使用#
@SuppressLint("CheckResult")
public void netConnection(String q,String from,String to,String salt,String sign,String curtime){
netWork.getInstance().getDataService()
.translateYouDao(q,from,to,appID,salt,sign,signType,curtime)
.subscribeOn(Schedulers.newThread()) //發起的執行在一個新的執行緒
.observeOn(AndroidSchedulers.mainThread()) //結果的執行在主執行緒
.subscribe(new Consumer<TranslationBean>() {
@Override
public void accept(TranslationBean translationBean) throws Exception {
//對接收到數據的類進行處理
}
});
}