RxJava 基本使用
前言
RxJava 是 NetFlix 出品的 Java 框架,RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。
a library for composing asynchronous and event-based programs using observable sequences for the Java VM。另一种翻译”使用可观察序列组成的一个异步地、基于事件的响应式编程框架“RxJava 在 Android 中一个典型的使用示范如下:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String s = "123"; // 执行耗时任务 emitter.onNext(s); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); RxJava 的核心是简化开发,因为它提高了围绕线程的抽象级别。也就是说,作为开发人员,您不必担心如何执行应该在不同线程上进行的操作的细节。这是特别有吸引力的,因为线程难以正确实现,如果执行不正确,可能会导致一些最困难的错误进行调试和修复。当然,这并不意味着 RxJava 在线程化方面是防弹的,了解幕后发生的事情仍然很重要。但是,RxJava 绝对可以使您的生活更轻松。
RxJava vs AsyncTask
假设我们要通过网络获取数据并更新 UI,AsyncTask 示例如下:
public class NetworkRequestTask extends AsyncTask<Void, Void, User> { private final int userId; public NetworkRequestTask(int userId) { this.userId = userId; } @Override protected User doInBackground(Void... params) { return networkService.getUser(userId); } @Override protected void onPostExecute(User user) { nameTextView.setText(user.getName()); // ...set other views }} private void onButtonClicked(Button button) { new NetworkRequestTask(123).execute()}相比之下,执行网络调用的RxJava方法可能看起来像这样:
private Subscription subscription; private void onButtonClicked(Button button) { subscription = networkService.getObservableUser(123) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<User>() { @Override public void call(User user) { nameTextView.setText(user.getName()); // ... set other views } });} @Override protected void onDestroy() { if (subscription != null && !subscription.isUnsubscribed()) { subscription.unsubscribe(); } super.onDestroy();}Android 创造的 AsyncTask ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。
Observable, Observer 和 Operator
在RxJava世界中,所有内容都可以建模为流。流随着时间的流逝发射项目,并且每次发射都可以消耗/观察
流的抽象是通过3个核心结构实现的,我将其称为“ 3 O”。即:可观察者,观察者和操作者。 Observable发出项目(流);观察者消耗这些物品。可观察对象的发射可以通过链接操作员调用来进一步修改,转换和操纵。
Observable
一个 Observable 是 RxJava 中的流抽象。它与 Iterator 相似,在给定序列的情况下,它会按顺序进行迭代并生成这些项。然后,消费者可以通过相同的界面消费这些物品,而不管其基础顺序如何。
在 ReactiveX 中,观察者订阅了一个 Observable。然后,该观察者对可观察对象发出的任何项目或项目序列做出反应。这种模式有助于并发操作,因为它在等待 Observable 发出对象时不需要阻塞,而是以观察者的形式创建了一个哨兵,随时准备在 Observable 以后的任何时间做出适当的反应。(例如用 RxJava 实现的 RxBus)
Observable 有很多种创建方式(Creating Observables), 常用创建方法:
// 1. Observable.create val observable1 = Observable.create<Int> { emitter -> emitter?.onNext(1) emitter?.onNext(2) emitter?.onNext(3) emitter?.onComplete() } // 2. Observable.just val observable2 = Observable.just(1, 2, 3) // 3. Observable.fromArray val observable3 = Observable.fromArray(1, 2, 3) // 4. create 的参数是 ObservableOnSubscribe,发射器是 ObservableEmitter,异常不会抛出 Observable.unsafeCreate(object : ObservableSource<String> { override fun subscribe(observer: Observer<in String>?) { observer?.onNext("1") observer?.onNext(null) observer?.onComplete() } }).subscribe({ Log.d(TAG, "------ onNext($it)") }, { Log.d(TAG, "------ onError() ${it?.message}") }) Observable 支持发送 Action、Callable、Future 等任务
//1. fromAction 没有返回值执行任务 Observable.fromAction<Int>(object : Action { override fun run() { Thread.sleep(1000) Log.d(TAG, "------ current thread=" + Thread.currentThread()) } }).subscribeOn(IoScheduler()) .observeOn(SingleScheduler()) .subscribe({ }, { Log.d(TAG, "------ onError() ${it?.message}") }, { Log.d(TAG, "------ onComplete() thread=" + Thread.currentThread()) })//2. fromCallable 有返回值执行任务 Observable.fromCallable(object : Callable<Int> { override fun call(): Int { Thread.sleep(1000) Log.d(TAG, "------ current thread=" + Thread.currentThread()) return 1 } }).subscribeOn(IoScheduler()) .observeOn(SingleScheduler()) .subscribe({ Log.d(TAG, "------ onNext($it)") }, { Log.d(TAG, "------ onError() ${it?.message}") }, { Log.d(TAG, "------ onComplete() current thread=" + Thread.currentThread()) })//3. fromFuture 用于观测一个 future val executor = Executors.newSingleThreadExecutor() val future = executor.submit(object : Runnable { override fun run() { Thread.sleep(1000) Log.d(TAG, "------ executor.submit current thread=" + Thread.currentThread()) } }, 1) Observable.fromFuture(future) .subscribeOn(IoScheduler()) .observeOn(SingleScheduler()) .subscribe({ Log.d(TAG, "------ onNext($it)") }, { Log.d(TAG, "------ onError() ${it?.message}") }, { Log.d(TAG, "------ onComplete() current thread=" + Thread.currentThread()) })Observer
Observable 流的下一个组件是已预订的 Observer(一个或多个 Observer)。每当流中发生“有趣”的事情时,便会通知观察者。通过以下事件通知观察者。
- Observer#onNext(T) - 从流中发出项目时调用
- Observable#onError(Throwable) -流中发生错误时调用
- Observable#onCompleted() - 当流完成发送项目时调用
要订阅流,只需调用 Observable#subscribe(...) 并传入一个 Observer 实例即可
// 订阅者 val observer = object : Observer<Int?> { override fun onSubscribe(d: @NonNull Disposable?) { } override fun onNext(integer: @NonNull Int?) { Log.d(TAG, "------ onNext($integer)") } override fun onError(e: @NonNull Throwable?) { Log.d(TAG, "------ onError() ${e?.message}") } override fun onComplete() { Log.d(TAG, "------ onComplete()") } } // 可观察者 val observable = Observable.just(1, 2, 3) observable.subscribe(observer)在 JVM 中,需要操作取消订阅避免内存泄漏。
// 1. 在 onSubscribe 获取 Disposable 对象 var disposable1: Disposable? = null val observer1 = object : Observer<Int?> { override fun onSubscribe(d: @NonNull Disposable?) { disposable1 = d } override fun onNext(integer: @NonNull Int?) { Log.d(TAG, "------ onNext($integer)") if (integer == 1) { disposable1?.dispose() } } override fun onError(e: @NonNull Throwable?) { Log.d(TAG, "------ onError() ${e?.message}") } override fun onComplete() { Log.d(TAG, "------ onComplete()") } } Observable.just(1, 2, 3).subscribe(observer1) // 2. subscribe 时返回了 disposable val disposable2 = Observable.just(1, 2, 3).subscribe({ Log.d(TAG, "------ onNext($it)") }, { Log.d(TAG, "------ onError() ${it?.message}") }) disposable2.dispose()// 2.1 也可以使用 CompositeDisposable 组合管理 disposable var compositeDisposable: CompositeDisposable = CompositeDisposable() compositeDisposable.add(disposable2) compositeDisposable.clear()// 3. DisposableObserver 对象,借助 CompositeDisposable val disposableObserver: DisposableObserver<Int> = object : DisposableObserver<Int>() { override fun onNext(integer: @NonNull Int) { Log.d(TAG, "------ onNext($integer)") } override fun onError(e: @NonNull Throwable?) {} override fun onComplete() {} } var compositeDisposable: CompositeDisposable = CompositeDisposable() compositeDisposable.add(disposableObserver) compositeDisposable.clear() Observable.just(1, 2, 3).subscribe(disposableObserver)Operator
Observable 发出的项目可以在通知订阅的 Observer 对象之前通过运算符进行转换,修改和过滤。在函数式编程中发现的一些最常见的操作(例如,map, filter, reduce,filter 等)也可以应用于可观察流。
// map Observable.just(1, 2, 3) .map(object : Function<Int, String> { override fun apply(t: Int?): String { return "map sfter " + t.toString() } }) .subscribe({ Log.d(TAG, "------ onNext($it)") })// flatMap Observable.just(1, 2, 3) .flatMap(object : Function<Int, ObservableSource<String>> { override fun apply(t: Int?): ObservableSource<String> { return object : ObservableSource<String> { override fun subscribe(observer: Observer<in String>?) { observer?.onNext("flatMap after " + t.toString()) observer?.onComplete() } } } }).subscribe({ Log.d(TAG, "------ onNext($it)") }, {}, { Log.d(TAG, "------ onComplete()") })// filter 返回 true 才能往下传递 Observable.just(1, 2, 3) .filter(object : Predicate<Int> { override fun test(t: Int): Boolean { return t % 2 == 0 } }).subscribe({ Log.d(TAG, "------ onNext($it)") }, {}, { Log.d(TAG, "------ onComplete()") })线程调度
Observable.fromCallable(object : Callable<Int> { override fun call(): Int { Thread.sleep(1000) Log.d(TAG, "------ call thread=" + Thread.currentThread().name) return 1 } }) .subscribeOn(NewThreadScheduler()) .map(object : Function<Int, String> { override fun apply(t: Int?): String { Log.d(TAG, "------ map thread=" + Thread.currentThread().name) return "map after " + t.toString() } }) .subscribeOn(IoScheduler()) .flatMap(object : Function<String, ObservableSource<String>> { override fun apply(t: String?): ObservableSource<String> { return object : ObservableSource<String> { override fun subscribe(observer: Observer<in String>?) { Log.d(TAG, "------ flatMap thread=" + Thread.currentThread().name) observer?.onNext("flatMap after " + t) observer?.onComplete() } } } }) .observeOn(IoScheduler()) .observeOn(SingleScheduler()) .subscribe({ Log.d(TAG, "------ onNext($it) thread=" + Thread.currentThread().name) }, { Log.d(TAG, "------ onError() ${it?.message}") }, { Log.d(TAG, "------ onComplete() thread=" + Thread.currentThread().name) })线程调度原理
参考 一张图看懂 Rxjava 的原理 这篇博客,从构建流、订阅流、观察者回调流三个方面讲述 RxJava 线程调度原理,深入浅出。学习技术原理,怎么读源码,这篇博客提供一个很好模版。