Magren

Magren

Idealist & Garbage maker 🛸
twitter
jike

Android的rxjava2

RxJava 是利用觀察者模式來實現一些列的操作,所以對於觀察者模式中的觀察者,被觀察者,以及訂閱、事件需要有一個了解。

Observable:在觀察者模式中稱為 “被觀察者”;
Observer:觀察者模式中的 “觀察者”,可接收 Observable 發送的數據;
subscribe:訂閱,觀察者與被觀察者,通過 Observable 的 subscribe () 方法進行訂閱;
Subscriber:也是一種觀察者,在 2.0 中 它與 Observer 沒什麼實質的區別,不同的是 Subscriber 要與 Flowable (也是一種被觀察者) 聯合使用,Obsesrver 用於訂閱 Observable,而 Subscriber 用於訂閱 Flowable.

觀察者模式#

rxjava 的實現主要是通過觀察者模式實現的。

A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。
在程序的觀察者模式,觀察者不需要時刻盯著被觀察者,而是採用註冊或者稱為訂閱的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。
同時我們也可以多個觀察者對應一個被觀察者。

其實在 android 中也有很多自帶的觀察者模式。最明顯的莫過於點擊事件。說個最簡單的例子,點擊按鈕後彈一個 Toast。那么,我們在點擊按鈕的時候,告知系統,此時,我需要彈一個吐司。那么就這麼彈出來了。那么,這個時候問題來了。我是否需要實時去監聽這個按鈕呢?答案是不需要的。這就和前面的舉例有的差距了。換句話說。我只要在此按鈕進行點擊時進行監聽就可以了。這種操作被稱為訂閱。也就是說 Button 通過 setOnClickListener 對 OnclickListener 進行了訂閱了操作,來監聽 onclick 方法。

基本使用#

rxjava 的基本實現主要是三點:

  • 初始化 Observable (被觀察者)
  • 初始化 Observe(觀察者)
  • 建立兩者之間的訂閱關係

創建 Observable#

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello world");
                e.onComplete(); //調用complete後下面將不再接受事件
            }
        });

創建 Observe#

 Observer<String>observer=new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("rxjava", "onSubscribe: " + d);
            }

            @Override
            public void onNext(String string) {
                Log.i("rxjava", "onNext: " + string);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("rxjava", "onError: " + e);
            }

            @Override
            public void onComplete() {
                Log.i("rxjava", "onComplete: ");
            }
        };
  • onSubscribe:它會在事件還未發送之前被調用,可以用來做一些準備操作。而裡面的 Disposable 則是用來切斷上下游的關係的。
  • onNext:普通的事件。將要處理的事件添加到隊列中。
  • onError:事件隊列異常,在事件處理過程中出現異常情況時,此方法會被調用。同時隊列將會終止,也就是不允許在有事件發出。
  • onComplete:事件隊列完成。rxjava 不僅把每個事件單獨處理。而且會把他們當成一個隊列。當不再有 onNext 事件發出時,需要觸發 onComplete 方法作為完成標識。

創建訂閱#

observable.subscribe(observer);

結果#

先調用 onSubscribe,然後走了 onNext,最後以 onComplete 收尾:
rxjava_result.png

執行緒的調度#

  • subscribeOn () 指定的是發射事件的執行緒,observerOn 指定的就是訂閱者接收事件的執行緒。
  • 多次指定發射事件的執行緒只有第一次指定的有效,也就是說多次調用 subscribeOn () 只有第一次的有效,其余的會被忽略。
  • 但多次指定訂閱者接收執行緒是可以的,也就是說每調用一次 observerOn (),下游的執行緒就會切換一次。

rxjava 中已經內置了一些執行緒供我們選擇:

  • Schedulers.io () 代表 io 操作的執行緒,通常用於網絡,讀寫文件等 io 密集型的操作
  • Schedulers.computation () 代表 CPU 計算密集型的操作,例如需要大量計算的操作
  • Schedulers.newThread () 代表一個常規的新執行緒
  • AndroidSchedulers.mainThread () 代表 Android 的主執行緒

例子:#

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello world   "+ Thread.currentThread().getName());
                e.onComplete(); //調用complete後下面將不再接受事件
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread()) //切換到主執行緒
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i("rxjava","主執行緒   "+Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i("rxjava","執行緒   "+Thread.currentThread().getName());
                    }
                });

結果#

2020-03-13 17:06:23.873 8760-8760/com.example.rxjava_test I/rxjava: onSubscribe: io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver@92153c7
2020-03-13 17:06:23.980 8760-8760/com.example.rxjava_test I/rxjava: 主執行緒   main
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: 執行緒   RxCachedThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onNext: hello world   RxNewThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onComplete:

結合 rxjava 和 retorfit#

創建接收伺服器返回數據的類#

public class TranslationBean {

    private int errorCode; //錯誤返回碼
    private String query;     //源語言
    private List<String> translation; //翻譯結果
    private basicEntity basic;
    private List<WebEntity> web;
    private String tSpeakUrl;

    public class basicEntity {
        @SerializedName("us-phonetic")
        private String usPhonetic;  //英式音標
        @SerializedName("uk-speech")
        private String ukSpeech;    //美式發音
        @SerializedName("us-speech")
        private String usSpeech;  //英式發音
        private String phonetic;   //默認音標
        @SerializedName("uk-phonetic")
        private String ukPhonetic;   //美式英標

        private List<String> explains;  //基本釋義


        public List<String> getExplains() {
            return explains;
        }

        public void setExplains(List<String> explains) {
            this.explains = explains;
        }

        public String getPhonetic() {
            return phonetic;
        }

        public void setPhonetic(String phonetic) {
            this.phonetic = phonetic;
        }

        public String getUkPhonetic() {
            return ukPhonetic;
        }

        public void setUkPhonetic(String ukPhonetic) {
            this.ukPhonetic = ukPhonetic;
        }

        public String getUsPhonetic() {
            return usPhonetic;
        }

        public void setUsPhonetic(String usPhonetic) {
            this.usPhonetic = usPhonetic;
        }

        public String getUkSpeech() {
            return ukSpeech;
        }

        public void setUkSpeech(String ukSpeech) {
            this.ukSpeech = ukSpeech;
        }

        public String getUsSpeech() {
            return usSpeech;
        }

        public void setUsSpeech(String usSpeech) {
            this.usSpeech = usSpeech;
        }

    }
    public class WebEntity {
        private String key;
        private List<String> value;

        public void setKey(String key) {
            this.key = key;
        }

        public void setValue(List<String> value) {
            this.value = value;
        }

        public String getKey() {
            return key;
        }

        public List<String> getValue() {
            return value;
        }
    }
    public void setQuery(String query) {
        this.query = query;
    }

    public void setErrorCode(int errorCode) {
        this.errorCode = errorCode;
    }

    public void setTranslation(List<String> translation) {
        this.translation = translation;
    }

    public void setWeb(List<WebEntity> web) {
        this.web = web;
    }

    public String getQuery() {
        return query;
    }

    public int getErrorCode() {
        return errorCode;
    }

    public List<String> getTranslation() {
        return translation;
    }

    public List<WebEntity> getWeb() {
        return web;
    }

    public basicEntity getBasic() {
        return basic;
    }

    public void setBasic(basicEntity basic) {
        this.basic = basic;
    }

    public void settSpeakUrl(String tSpeakUrl){ this.tSpeakUrl = tSpeakUrl;}

    public String gettSpeakUrl(){ return tSpeakUrl; }
}

創建用於描述網絡請求的接口#

public interface networkApi {
    @GET("api?")
    Observable<TranslationBean> translateYouDao(
            @Query("q") String q,
            @Query("from") String from,
            @Query("to") String to,
            @Query("appKey") String appKey, //應用ID
            @Query("salt") String salt,  //UUID
            @Query("sign") String sign,  //應用ID+input+salt+curtime+應用密鑰 。 input= q前10個字符+q長度+q後10個字符(q的長度>=20) 或input = 字符串
            @Query("signType") String signType, //簽名類型
            @Query("curtime") String curtime //時間戳
    );
}

創建 Retorfit#

public class netWork {
    private static networkApi sContactsApi;
    private static OkHttpClient okHttpClient = new OkHttpClient();
    private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
    private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();

    private static class ApiClientHolder {
        public static final netWork INSTANCE = new netWork();
    }

    public static netWork getInstance() {
        return ApiClientHolder.INSTANCE;
    }

    public networkApi getDataService() {
        if (sContactsApi == null) {
            Retrofit retrofit = new Retrofit.Builder()
                    .client(okHttpClient)
                    .baseUrl(Constants.BASE_URL)
                    .addConverterFactory(gsonConverterFactory)
                    .addCallAdapterFactory(rxJavaCallAdapterFactory)
                    .build();
            sContactsApi = retrofit.create(networkApi.class);
        }
        return sContactsApi;
    }

}

使用#

@SuppressLint("CheckResult")
   public void netConnection(String q,String from,String to,String salt,String sign,String curtime){
       netWork.getInstance().getDataService()
               .translateYouDao(q,from,to,appID,salt,sign,signType,curtime)
               .subscribeOn(Schedulers.newThread()) //發起的執行在一個新的執行緒
               .observeOn(AndroidSchedulers.mainThread()) //結果的執行在主執行緒
               .subscribe(new Consumer<TranslationBean>() {
                   @Override
                   public void accept(TranslationBean translationBean) throws Exception {
                       //對接收到數據的類進行處理
                   }
               });
   }
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。