RxJava は、オブザーバーパターンを利用して一連の操作を実現するため、オブザーバーパターンにおけるオブザーバー、被オブザーバー、サブスクリプション、イベントについて理解しておく必要があります。
Observable:オブザーバーパターンにおいて「被オブザーバー」と呼ばれる;
Observer:オブザーバーパターンにおける「オブザーバー」で、Observable から送信されたデータを受け取ることができる;
subscribe:サブスクリプション、オブザーバーと被オブザーバーは、Observable の subscribe () メソッドを通じてサブスクライブする;
Subscriber:これもオブザーバーの一種で、2.0 では Observer との実質的な違いはありませんが、Subscriber は Flowable(これも被オブザーバーの一種)と一緒に使用され、Observer は Observable をサブスクライブするために使用され、Subscriber は Flowable をサブスクライブするために使用されます。
オブザーバーパターン#
rxjava の実装は主にオブザーバーパターンを通じて実現されています。
A オブジェクト(オブザーバー)は B オブジェクト(被オブザーバー)のある変化に非常に敏感であり、B が変化する瞬間に反応する必要があります。
プログラムのオブザーバーパターンでは、オブザーバーは常に被オブザーバーを監視する必要はなく、登録またはサブスクリプションの方法を用いて、被オブザーバーに「私はあなたのある状態が必要であり、それが変化したときに私に通知してください」と伝えます。
同時に、複数のオブザーバーが 1 つの被オブザーバーに対応することもできます。
実際、Android にも多くの組み込みのオブザーバーパターンがあります。最も明白なのはクリックイベントです。最も簡単な例を挙げると、ボタンをクリックした後にトーストを表示することです。つまり、ボタンをクリックする際に、システムに「今、トーストを表示したい」と通知します。そうすると、トーストが表示されます。このとき、問題が発生します。私はこのボタンをリアルタイムで監視する必要がありますか?答えは「必要ありません」です。これは前述の例とは異なります。言い換えれば、ボタンがクリックされたときだけ監視すればよいのです。この操作はサブスクリプションと呼ばれます。つまり、Button は setOnClickListener を通じて OnclickListener に対してサブスクリプションを行い、onclick メソッドを監視します。
基本使用#
rxjava の基本実装は主に 3 つのポイントです:
- Observable(被オブザーバー)の初期化
- Observe(オブザーバー)の初期化
- 両者の間にサブスクリプション関係を確立する
Observable の作成#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");
e.onComplete(); // completeを呼び出すと、以下のイベントは受け取られなくなります
}
});
Observe の作成#
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("rxjava", "onSubscribe: " + d);
}
@Override
public void onNext(String string) {
Log.i("rxjava", "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.i("rxjava", "onError: " + e);
}
@Override
public void onComplete() {
Log.i("rxjava", "onComplete: ");
}
};
- onSubscribe:イベントが送信される前に呼び出され、準備操作を行うために使用できます。その中の Disposable は、上下流の関係を切断するために使用されます。
- onNext:通常のイベント。処理するイベントをキューに追加します。
- onError:イベントキューの例外。イベント処理中に異常が発生した場合、このメソッドが呼び出されます。同時に、キューは終了し、イベントが発生することは許可されません。
- onComplete:イベントキューの完了。rxjava は、各イベントを個別に処理するだけでなく、それらをキューとして扱います。onNext イベントが発生しなくなったとき、onComplete メソッドをトリガーして完了を示す必要があります。
サブスクリプションの作成#
observable.subscribe(observer);
結果#
最初に onSubscribe が呼び出され、次に onNext が実行され、最後に onComplete で終了します:
スレッドのスケジューリング#
- subscribeOn () はイベントを発生させるスレッドを指定し、observerOn はサブスクライバーがイベントを受け取るスレッドを指定します。
- 発生させるスレッドを複数回指定しても、最初に指定したものだけが有効です。つまり、subscribeOn () を複数回呼び出しても、最初のものだけが有効で、残りは無視されます。
- ただし、サブスクライバーが受け取るスレッドを複数回指定することは可能です。つまり、observerOn () を呼び出すたびに、下流のスレッドが切り替わります。
rxjava には、私たちが選択できるいくつかのスレッドがすでに組み込まれています:
- Schedulers.io () は IO 操作のスレッドを表し、通常はネットワーク、ファイルの読み書きなどの IO 集約型操作に使用されます。
- Schedulers.computation () は CPU 計算集約型の操作を表し、大量の計算を必要とする操作に使用されます。
- Schedulers.newThread () は通常の新しいスレッドを表します。
- AndroidSchedulers.mainThread () は Android のメインスレッドを表します。
例:#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world " + Thread.currentThread().getName());
e.onComplete(); // completeを呼び出すと、以下のイベントは受け取られなくなります
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()) // メインスレッドに切り替え
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava", "メインスレッド " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava", "thread " + Thread.currentThread().getName());
}
});
結果#
2020-03-13 17:06:23.873 8760-8760/com.example.rxjava_test I/rxjava: onSubscribe: io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver@92153c7
2020-03-13 17:06:23.980 8760-8760/com.example.rxjava_test I/rxjava: メインスレッド main
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: thread RxCachedThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onNext: hello world RxNewThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onComplete:
rxjava と retrofit の統合#
サーバーからのデータを受信するためのクラスを作成#
public class TranslationBean {
private int errorCode; // エラー返却コード
private String query; // ソース言語
private List<String> translation; // 翻訳結果
private basicEntity basic;
private List<WebEntity> web;
private String tSpeakUrl;
public class basicEntity {
@SerializedName("us-phonetic")
private String usPhonetic; // 英式音標
@SerializedName("uk-speech")
private String ukSpeech; // 美式発音
@SerializedName("us-speech")
private String usSpeech; // 英式発音
private String phonetic; // デフォルト音標
@SerializedName("uk-phonetic")
private String ukPhonetic; // 美式英標
private List<String> explains; // 基本的な意味
public List<String> getExplains() {
return explains;
}
public void setExplains(List<String> explains) {
this.explains = explains;
}
public String getPhonetic() {
return phonetic;
}
public void setPhonetic(String phonetic) {
this.phonetic = phonetic;
}
public String getUkPhonetic() {
return ukPhonetic;
}
public void setUkPhonetic(String ukPhonetic) {
this.ukPhonetic = ukPhonetic;
}
public String getUsPhonetic() {
return usPhonetic;
}
public void setUsPhonetic(String usPhonetic) {
this.usPhonetic = usPhonetic;
}
public String getUkSpeech() {
return ukSpeech;
}
public void setUkSpeech(String ukSpeech) {
this.ukSpeech = ukSpeech;
}
public String getUsSpeech() {
return usSpeech;
}
public void setUsSpeech(String usSpeech) {
this.usSpeech = usSpeech;
}
}
public class WebEntity {
private String key;
private List<String> value;
public void setKey(String key) {
this.key = key;
}
public void setValue(List<String> value) {
this.value = value;
}
public String getKey() {
return key;
}
public List<String> getValue() {
return value;
}
}
public void setQuery(String query) {
this.query = query;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public void setTranslation(List<String> translation) {
this.translation = translation;
}
public void setWeb(List<WebEntity> web) {
this.web = web;
}
public String getQuery() {
return query;
}
public int getErrorCode() {
return errorCode;
}
public List<String> getTranslation() {
return translation;
}
public List<WebEntity> getWeb() {
return web;
}
public basicEntity getBasic() {
return basic;
}
public void setBasic(basicEntity basic) {
this.basic = basic;
}
public void settSpeakUrl(String tSpeakUrl){ this.tSpeakUrl = tSpeakUrl;}
public String gettSpeakUrl(){ return tSpeakUrl; }
}
ネットワークリクエストを説明するためのインターフェースを作成#
public interface networkApi {
@GET("api?")
Observable<TranslationBean> translateYouDao(
@Query("q") String q,
@Query("from") String from,
@Query("to") String to,
@Query("appKey") String appKey, // アプリID
@Query("salt") String salt, // UUID
@Query("sign") String sign, // アプリID+input+salt+curtime+アプリ秘密鍵 。 input= qの最初の10文字+qの長さ+qの最後の10文字(qの長さ>=20)またはinput = 文字列
@Query("signType") String signType, // 署名タイプ
@Query("curtime") String curtime // タイムスタンプ
);
}
Retrofit の作成#
public class netWork {
private static networkApi sContactsApi;
private static OkHttpClient okHttpClient = new OkHttpClient();
private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();
private static class ApiClientHolder {
public static final netWork INSTANCE = new netWork();
}
public static netWork getInstance() {
return ApiClientHolder.INSTANCE;
}
public networkApi getDataService() {
if (sContactsApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(okHttpClient)
.baseUrl(Constants.BASE_URL)
.addConverterFactory(gsonConverterFactory)
.addCallAdapterFactory(rxJavaCallAdapterFactory)
.build();
sContactsApi = retrofit.create(networkApi.class);
}
return sContactsApi;
}
}
使用#
@SuppressLint("CheckResult")
public void netConnection(String q,String from,String to,String salt,String sign,String curtime){
netWork.getInstance().getDataService()
.translateYouDao(q,from,to,appID,salt,sign,signType,curtime)
.subscribeOn(Schedulers.newThread()) // 新しいスレッドで実行を開始
.observeOn(AndroidSchedulers.mainThread()) // 結果の実行をメインスレッドで
.subscribe(new Consumer<TranslationBean>() {
@Override
public void accept(TranslationBean translationBean) throws Exception {
// 受信したデータのクラスを処理
}
});
}