股票场内基金交易,没时间盯盘?
什么是 RxJS
在介绍 RxJS 的基本概念和使用之前,先来了解一下什么是 RxJS。
RxJS 是 ReactiveX 家族中的一员。关于 ReactiveX ,以下是官网介绍原文。
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
ReactiveX 是一个库,使用可观察序列来构建异步和基于事件的应程序。
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
它扩展了观察者模式,支持数据和/或事件序列。添加一系列操作符,允许你声明式地将序列构建在一起,不再需要担忧诸如低等级线程、同步、线程安全、并发数据结构和非阻塞 I/O 等此类事情。
看上去似乎除了一大堆术语以外,没有任何有助于理解的内容。实际上,ReactiveX 这个名称本身就已经足够说明它究竟是什么东西了。
ReactiveX,或者称作 Reactive Extension,主要目的是构建响应式程序,使用一种抽象的方式来处理数据和事件。那么什么是响应式程序?下面我们用一个简化(可能并没有实际意义)的例子来进行说明。
假如我们有两个变量 x 和 y,我们希望通过 x 和 y 计算出另一个新的变量 z,我们可以写:
z = x + y;
然后我们使用 z 的值去做一些其他的事情。在程序进行一段时间后,x 和/或 y 的值发生了变化,这时我们希望可以不重新计算 z 的值(即重复写一行上面的代码),就可以使 z 的值跟随着 x 和/或 y 的变化而自动发生变化,这就是所谓的响应式编程。
现在,我们有了 ReactiveX 就可以轻松实现这样的想法。ReactiveX 利用观察者模式,将 x 和/或 y 作为观察的对象,监听它们的变化,一旦发现它们的值发生变化,就调用某个函数来重新计算并更新 z 的值,而不需要劳烦程序员再去处理这一问题了。ReactiveX 就是基于这种思想基础发展出来的一套编程模式。
现在我们该回过头来看看 RxJS 了。RxJS 中的 Rx 前缀代表的就是 ReactiveX,因此它就是响应式编程、观察者模式……等等这一系列思想的具体实现,JS 表示 ReactiveX 在 JavaScript 语言的实现。当然,你可以想到对应还会有 RxJava、RxSwift、Rx.NET……
另外,在这里先补充一点,ReactiveX 在实际编程中常常与异步机制联系在一起,但不代表它不可以处理同步事件,这一点在后面会有进一步的说明。
好了,在了解了 RxJS 的真实面目之后,可以开始深入一步,看看 RxJS 究竟包含哪些内容了。
RxJS 的五大抽象概念
想要理解 RxJS 的原理和使用,就必须要理解它使用的五大抽象概念(这么叫是为了便于说明和记忆),分别如下所示:
- 可观察对象(Observable)
- 观察/订阅者(Observer)
- 订阅(Subscription)
- Subject(没有适合的中文翻译,就直接用英文名称好了,本文暂时不做介绍)
- 操作符(operators)(本篇文章赞不介绍,另外 Schedulers 也不在本文内说明)
限于篇幅的限制,对于Subject 和操作符的介绍可能放在以后的文章内容中,本文不做介绍。
下面就依次介绍这几大抽象概念,逐渐熟悉 RxJS 的核心基础。
Observable
可观察对象(Observable)是 ReactiveX 中最为重要的一个概念,Observable 作为数据的生产者,可以向外提供特定类型的数据,它被设计成数据之源,可以向外界发送数据,这是响应式编程的核心。我们可以想象出一条管道,数据就像是管道中流动的水,那么 Observable 就是出水口,向管道中注入数据,使数据不断更新并流动到需要它们的地方去。
我们可以像如下这样的方式来创建一个 Observable。
1 2 3 4 5 6 |
var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); setTimeout(() => { observer.next(3) }, 100); }); |
该方法是通过一个函数来创建 Observable 对象,函数需要传入一个 observer 对象。没错,这里的 observer 可以认为就是下一小结要介绍的 Observer 对象,这里暂时不去管它,只要知道有这么回事就行了。(注意:创建 Observable 的方法不止上述这一种。)
每当调用一次 next 方法,表示 Observable 向外发射一次数据。了解 ES5 的人可能会注意到,这里就是 Observable 和 Promise (有机会可能会写文章介绍一下 Promise)的差别所在,Promise 只返回一次值便结束了,而 Observable 可以返回任意次值,这也是其有可能成为响应式编程的原因之一。
就像这样,Observable 不断通过内部调用消费者的 next 方法向外发送数据,直到内部出现错误或者主动结束数据的发送。
正因为 Observable 控制着数据的输出,它可以以任意形式决定发出数据的时机,就像例子中,1 和 2 都将是立即发送的同步操作,而 3 被放到了 setTimeout 函数中,形成了一次数据的异步发送。所以说,RxJS 可以完成对数据的同步和异步操作,这取决于 Observable 数据源的特征。
现在我们有了数据源 Observable,那么该如何获得其中的数据流呢?在 Observable 对象创建的时候,不论是同步发送数据,还是异步发送数据,都不会立即执行,只有对 Observable 采取监听操作时,数据才会开始发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
console.log('before'); observable.subscribe( (data) => { console.log(data); }, ); console.log('after'); // 结果: // before // 1 // 2 // after // 3 |
只有给 Observable 注册上 Observer 后才能接收到数据,调用 subscribe 方法就像是调用函数一样,Observable 会立即产生一个返回值给 Observer 处理。这样的处理方式,实现了响应式编程的功能。
另外,一个 Observable 可以被多个 Observer 订阅(subscribe),而且每一个订阅都是独立执行的,每个 Observer 都能监听到 Observable 所有的数据输出。也就是说,订阅是一个单播(一对一)模式,每一个订阅是独立且唯一的,互相之间并不会产生干扰。
Observer
终于该介绍观察者(Observer)了。如果说 Observable 是数据流的生产者,那么 Observer 就是数据流的消费者。Observer 订阅(subscribe)Observable 并从中获得数据,每获得一次数据都会执行一次 Observer 的回调函数来处理返回的数据值。
如前面内容所示,我们可以大概得知 Observer 实际上就是一个包含处理数据的回调函数的对象,这个方法有一个固定的名称,就是 next ,此外,Observer 还可以定义 error 回调和 complete 回调。
1 2 3 4 5 6 7 8 |
var observer = { next: (data) => { console.log(data); }, error: (error) => { console.log(error); }, complete: () => { console.log('complete'); }, }; observable.subscribe(observer); |
将 Observer 作为参数传递给 subscribe 方法,就大功告成了,其余的事情就可以放心地交给 RxJS 处理了。
当然,subscribe 的参数还可以有另一种写法,只传递回调函数,subscribe 内部将利用传递进去的回调函数构造出 Observer 对象。
1 2 3 4 5 6 |
observable.subscribe( (data) => { console.log(data); }, (error) => { console.log(error); }, () => { console.log('complete'); }, ); |
总的来说,Observer 的理解和使用相比 Observable 简单许多,主要注册正确的处理数据的方法,就完全不需要考虑 Observable 内部是如何运转的了。
Subscription
订阅(Subscription)是一个比较好理解的概念,官网上给出 Subscription 具有释放资源和取消 Observable 执行的功能,有一个 unsubscribe 方法。当调用 subscribe 方法时,就返回一个 Subscription 对象,当使用完 Observable 或者想要取消对 Observable 的监听,只要调用 Subscription.unsubscribe() 就可以了,取消注册以避免资源和内存的占用。
1 2 3 4 |
var subscription = observable.subscribe(observer); // ... doing things subscription.unsubscribe(); |
就是这么简单。
还记得我们前面举的 z = x + y 的例子吗?现在我们可以使用已经介绍的 RxJS 中的概念来实现响应式编程了。如果我们希望在 x 的值变化后自动更新 z 的值,那么我们需要对 x 的变化进行监听。将 x 定义为一个 Observable,并注册一个 Observer,在 x 的值变化后,Observable 将通知 Observer 并执行回调函数,这样就可以完成 z 值的更新了。
1 2 3 4 5 6 7 8 9 |
var z; var observable = Observable(...); // 产生 x 的值的可观察对象 observable.subscribe( (x) => { z = x + y; // doing something } ); |
到此,RxJS 中涉及到的基本概念和使用就介绍完毕,而对 RxJS 的认识才刚刚入门,本文的内容也许只是学习 RxJS 以及 ReactiveX 的冰山一角,但理解了这些基本概念,对以后深入的学习,希望会起到一定的帮助作用。
后记
花了一天的时间过了一遍 RxJS 的文档,也只是侧重其中的某些部分粗略的看了一遍,关于 RxJS 应用的其他方面,还有大量的参考文章可以去学习,这些应该都是时间上的功夫了。本文大致上是对文档中重要概念的学习笔记,包括一些自己的理解,会有不准确或不恰当之处,也欢迎指正和讨论。
第一次写作技术类文章,看起来容易,写起来就会遇到很多困难,写作的速度也非常慢,有诸多不妥,还请见谅。
想获得去掉 5 元限制的证券账户吗?

如果您想去掉最低交易佣金 5 元限制,使用微信扫描左边小程序二维码,访问微信小程序「优财助手」,点击底部菜单「福利」,阅读文章「通过优财开证券账户无最低交易佣金 5 元限制」,按照文章步骤操作即可获得免 5 元证券账户,股票基金交易手续费率万 2.5。
请注意,一定要按照文章描述严格操作,如错误开户是无法获得免 5 元证券账户的。