Cursor blinking

RxJava 基本使用

Android 基础|字数 1,084|阅读时长≈ 3 分钟

前言

RxJava 是 NetFlix 出品的 Java 框架,RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

Code
a library for composing asynchronous and event-based programs using observable sequences for the Java VM。另一种翻译”使用可观察序列组成的一个异步地、基于事件的响应式编程框架“

RxJava 在 Android 中一个典型的使用示范如下:

Code
Observable.create(new ObservableOnSubscribe<String>() {            @Override            public void subscribe(ObservableEmitter<String> emitter) throws Exception {                String s = "123";                // 执行耗时任务                emitter.onNext(s);            }        }).map(new Function<String, Integer>() {            @Override            public Integer apply(String s) throws Exception {                return Integer.parseInt(s);            }        }).subscribeOn(Schedulers.io())          .observeOn(AndroidSchedulers.mainThread())          .subscribe(); 

RxJava 的核心是简化开发,因为它提高了围绕线程的抽象级别。也就是说,作为开发人员,您不必担心如何执行应该在不同线程上进行的操作的细节。这是特别有吸引力的,因为线程难以正确实现,如果执行不正确,可能会导致一些最困难的错误进行调试和修复。当然,这并不意味着 RxJava 在线程化方面是防弹的,了解幕后发生的事情仍然很重要。但是,RxJava 绝对可以使您的生活更轻松。

RxJava vs AsyncTask

假设我们要通过网络获取数据并更新 UI,AsyncTask 示例如下:

Code
public class NetworkRequestTask extends AsyncTask<Void, Void, User> {     private final int userId;     public NetworkRequestTask(int userId) {        this.userId = userId;    }     @Override protected User doInBackground(Void... params) {        return networkService.getUser(userId);    }     @Override protected void onPostExecute(User user) {        nameTextView.setText(user.getName());        // ...set other views    }} private void onButtonClicked(Button button) {   new NetworkRequestTask(123).execute()}

相比之下,执行网络调用的RxJava方法可能看起来像这样:

Code
private Subscription subscription; private void onButtonClicked(Button button) {   subscription = networkService.getObservableUser(123)                      .subscribeOn(Schedulers.io())                      .observeOn(AndroidSchedulers.mainThread())                      .subscribe(new Action1<User>() {                          @Override public void call(User user) {                              nameTextView.setText(user.getName());                              // ... set other views                          }                      });} @Override protected void onDestroy() {   if (subscription != null && !subscription.isUnsubscribed()) {       subscription.unsubscribe();   }   super.onDestroy();}

Android 创造的 AsyncTask ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

Observable, Observer 和 Operator

在RxJava世界中,所有内容都可以建模为流。流随着时间的流逝发射项目,并且每次发射都可以消耗/观察

流的抽象是通过3个核心结构实现的,我将其称为“ 3 O”。即:可观察者,观察者和操作者。 Observable发出项目(流);观察者消耗这些物品。可观察对象的发射可以通过链接操作员调用来进一步修改,转换和操纵。

Observable

一个 Observable 是 RxJava 中的流抽象。它与 Iterator 相似,在给定序列的情况下,它会按顺序进行迭代并生成这些项。然后,消费者可以通过相同的界面消费这些物品,而不管其基础顺序如何。

在 ReactiveX 中,观察者订阅了一个 Observable。然后,该观察者对可观察对象发出的任何项目或项目序列做出反应。这种模式有助于并发操作,因为它在等待 Observable 发出对象时不需要阻塞,而是以观察者的形式创建了一个哨兵,随时准备在 Observable 以后的任何时间做出适当的反应。(例如用 RxJava 实现的 RxBus)

Observable 有很多种创建方式(Creating Observables), 常用创建方法:

Code
 // 1. Observable.create        val observable1 = Observable.create<Int> { emitter ->            emitter?.onNext(1)            emitter?.onNext(2)            emitter?.onNext(3)            emitter?.onComplete()        } // 2. Observable.just        val observable2 = Observable.just(1, 2, 3) // 3. Observable.fromArray        val observable3 = Observable.fromArray(1, 2, 3) // 4. create 的参数是 ObservableOnSubscribe,发射器是 ObservableEmitter,异常不会抛出 Observable.unsafeCreate(object : ObservableSource<String> {            override fun subscribe(observer: Observer<in String>?) {                observer?.onNext("1")                observer?.onNext(null)                observer?.onComplete()            }        }).subscribe({            Log.d(TAG, "------ onNext($it)")        }, {            Log.d(TAG, "------ onError() ${it?.message}")        }) 

Observable 支持发送 Action、Callable、Future 等任务

Code
//1. fromAction 没有返回值执行任务        Observable.fromAction<Int>(object : Action {            override fun run() {                Thread.sleep(1000)                Log.d(TAG, "------ current thread=" + Thread.currentThread())            }        }).subscribeOn(IoScheduler())            .observeOn(SingleScheduler())            .subscribe({            }, {                Log.d(TAG, "------ onError() ${it?.message}")            }, {                Log.d(TAG, "------ onComplete() thread=" + Thread.currentThread())            })
Code
//2. fromCallable 有返回值执行任务        Observable.fromCallable(object : Callable<Int> {            override fun call(): Int {                Thread.sleep(1000)                Log.d(TAG, "------ current thread=" + Thread.currentThread())                return 1            }        }).subscribeOn(IoScheduler())            .observeOn(SingleScheduler())            .subscribe({                Log.d(TAG, "------ onNext($it)")            }, {                Log.d(TAG, "------ onError() ${it?.message}")            }, {                Log.d(TAG, "------ onComplete() current thread=" + Thread.currentThread())            })

Code
//3. fromFuture 用于观测一个 future        val executor = Executors.newSingleThreadExecutor()        val future = executor.submit(object : Runnable {            override fun run() {                Thread.sleep(1000)                Log.d(TAG, "------ executor.submit current thread=" + Thread.currentThread())            }        }, 1)         Observable.fromFuture(future)            .subscribeOn(IoScheduler())            .observeOn(SingleScheduler())            .subscribe({                Log.d(TAG, "------ onNext($it)")            }, {                Log.d(TAG, "------ onError() ${it?.message}")            }, {                Log.d(TAG, "------ onComplete() current thread=" + Thread.currentThread())            })

Observer

Observable 流的下一个组件是已预订的 Observer(一个或多个 Observer)。每当流中发生“有趣”的事情时,便会通知观察者。通过以下事件通知观察者。

  • Observer#onNext(T) - 从流中发出项目时调用
  • Observable#onError(Throwable) -流中发生错误时调用
  • Observable#onCompleted() - 当流完成发送项目时调用

要订阅流,只需调用 Observable#subscribe(...) 并传入一个 Observer 实例即可

Code
// 订阅者        val observer = object : Observer<Int?> {            override fun onSubscribe(d: @NonNull Disposable?) {            }             override fun onNext(integer: @NonNull Int?) {                Log.d(TAG, "------ onNext($integer)")            }             override fun onError(e: @NonNull Throwable?) {                Log.d(TAG, "------ onError() ${e?.message}")            }             override fun onComplete() {                Log.d(TAG, "------ onComplete()")            }        } // 可观察者         val observable = Observable.just(1, 2, 3)         observable.subscribe(observer)

在 JVM 中,需要操作取消订阅避免内存泄漏。

Code
 // 1. 在 onSubscribe 获取 Disposable 对象        var disposable1: Disposable? = null        val observer1 = object : Observer<Int?> {            override fun onSubscribe(d: @NonNull Disposable?) {                disposable1 = d            }             override fun onNext(integer: @NonNull Int?) {                Log.d(TAG, "------ onNext($integer)")                if (integer == 1) {                    disposable1?.dispose()                }            }             override fun onError(e: @NonNull Throwable?) {                Log.d(TAG, "------ onError() ${e?.message}")            }             override fun onComplete() {                Log.d(TAG, "------ onComplete()")            }        }        Observable.just(1, 2, 3).subscribe(observer1)
Code
 // 2. subscribe 时返回了 disposable        val disposable2 = Observable.just(1, 2, 3).subscribe({            Log.d(TAG, "------ onNext($it)")        }, {            Log.d(TAG, "------ onError() ${it?.message}")        })        disposable2.dispose()// 2.1 也可以使用 CompositeDisposable 组合管理 disposable        var compositeDisposable: CompositeDisposable = CompositeDisposable()        compositeDisposable.add(disposable2)        compositeDisposable.clear()
Code
// 3. DisposableObserver 对象,借助 CompositeDisposable        val disposableObserver: DisposableObserver<Int> = object : DisposableObserver<Int>() {            override fun onNext(integer: @NonNull Int) {                Log.d(TAG, "------ onNext($integer)")            }             override fun onError(e: @NonNull Throwable?) {}            override fun onComplete() {}        }         var compositeDisposable: CompositeDisposable = CompositeDisposable()        compositeDisposable.add(disposableObserver)        compositeDisposable.clear()         Observable.just(1, 2, 3).subscribe(disposableObserver)

Operator

Observable 发出的项目可以在通知订阅的 Observer 对象之前通过运算符进行转换,修改和过滤。在函数式编程中发现的一些最常见的操作(例如,map, filter, reduce,filter 等)也可以应用于可观察流。

Code
// map        Observable.just(1, 2, 3)            .map(object : Function<Int, String> {                override fun apply(t: Int?): String {                    return "map sfter " + t.toString()                }            })            .subscribe({                Log.d(TAG, "------ onNext($it)")            })
Code
// flatMap        Observable.just(1, 2, 3)            .flatMap(object : Function<Int, ObservableSource<String>> {                override fun apply(t: Int?): ObservableSource<String> {                    return object : ObservableSource<String> {                        override fun subscribe(observer: Observer<in String>?) {                            observer?.onNext("flatMap after " + t.toString())                            observer?.onComplete()                        }                    }                }            }).subscribe({                Log.d(TAG, "------ onNext($it)")            }, {}, {                Log.d(TAG, "------ onComplete()")            })

Code
// filter 返回 true 才能往下传递        Observable.just(1, 2, 3)            .filter(object : Predicate<Int> {                override fun test(t: Int): Boolean {                    return t % 2 == 0                }            }).subscribe({                Log.d(TAG, "------ onNext($it)")            }, {}, {                Log.d(TAG, "------ onComplete()")            })

线程调度

Code
Observable.fromCallable(object : Callable<Int> {            override fun call(): Int {                Thread.sleep(1000)                Log.d(TAG, "------ call thread=" + Thread.currentThread().name)                return 1            }        })            .subscribeOn(NewThreadScheduler())            .map(object : Function<Int, String> {                override fun apply(t: Int?): String {                    Log.d(TAG, "------ map thread=" + Thread.currentThread().name)                    return "map after " + t.toString()                }            })            .subscribeOn(IoScheduler())            .flatMap(object : Function<String, ObservableSource<String>> {                override fun apply(t: String?): ObservableSource<String> {                    return object : ObservableSource<String> {                        override fun subscribe(observer: Observer<in String>?) {                            Log.d(TAG, "------ flatMap thread=" + Thread.currentThread().name)                            observer?.onNext("flatMap after " + t)                            observer?.onComplete()                        }                    }                }            })            .observeOn(IoScheduler())            .observeOn(SingleScheduler())            .subscribe({                Log.d(TAG, "------ onNext($it) thread=" + Thread.currentThread().name)            }, {                Log.d(TAG, "------ onError() ${it?.message}")            }, {                Log.d(TAG, "------ onComplete() thread=" + Thread.currentThread().name)            })

线程调度原理

参考 一张图看懂 Rxjava 的原理 这篇博客,从构建流、订阅流、观察者回调流三个方面讲述 RxJava 线程调度原理,深入浅出。学习技术原理,怎么读源码,这篇博客提供一个很好模版。

参考文档