-->
尽管 RxJS 的基础是 Observable(可观察对象),但其最有用地方在于它的操作符。操作符是让复杂的异步代码易于以声明式组合起来必不可少的部分。
操作符是函数,有两种操作符:
管道操作符是一种能用语句 ObservableInstance.pipe(operator())
把 Observables 管道化的操作符。管道操作符包括 filter(…), 和 mergeMap(…) 等。当我们调用时,操作符不会改变已经存在的 Observable 实例。相反,它们会返回一个新的 Observable,其订阅逻辑是基于第一个 Observable。
管道操作符是一种将某个 Observable 作为输入,并返回另一个 Observable 的函数。这是一个纯操作:作为输入的 Observable 保持不变。
一个管道操作符本质上是一个纯函数,它将某个 Observable 作为输入,并生成另一个 Observable 作为输出。订阅输出的 Observable 也将会订阅到输入的 Observable。
创建操作符是一种被作为独立函数调用的操作符,用来创建一个新的 Observable。例如: of(1, 2, 3)
创建了一个将会依次发出 1,2,3 的 Observable。创建操作符在稍后的部分会做更详细地讨论。
例如,被调用的操作符 map 类似于同名的数组方法。就像[1, 2, 3].map(x => x * x)
将会生成[1, 4, 9]
,Observable 将会被创建如下:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));
// Logs:
// value: 1
// value: 4
// value: 9
将会发出 1
, 4
, 9
。另一个有用的操作符是 first:
import { of } from 'rxjs';
import { first } from 'rxjs/operators';
first()(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));
// Logs:
// value: 1
注意 map
逻辑上必须动态构造,因为必须为其提供映射功能。相比之下,first
可能是一个常量,但它仍然是动态构造的。通常,无论操作符是否需要传入参数,它们都能被构造出来。
管道操作符是函数,因此可以像普通函数那样使用:op()(obs)
— 但实际中,往往许多操作符会嵌套在一起使用,立马就变得不可读了:op4()(op3()(op2()(op1()(obs))))
。因此,Observables 有一个叫做.pipe()
的方法 ,可以做到相同的事情 ,但更具有可读性:
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)
从风格上讲,即使只有一个操作符,人们也不会用 op()(obs)
;obs.pipe(op())
是普遍地首选写法。
**什么是创建操作符?**区别于管道操作符,创建操作符是用来创建具有某些常见预定义行为的 Observable, 或是通过组合其它 Observables 创建出新 Observables 的函数。
创建操作符的一个典型示例是 interval
函数。它以一个数字(不是 Observable)作为输入参数,产生一个 Observable 作为输出:
import { interval } from 'rxjs';
const observable = interval(1000 /* number of milliseconds */);
通常情况下, Observables 会发出字符串和数字之类的普通值,但我们也经常需要处理 Observables 的 Observables,即所谓的高阶 Observables。例如,假设你有一个发出字符串的 Observables,这些字符串是你要查看的文件链接。代码如下:
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
);
http.get()
为每个独立的链接返回一个 Observable(可能是字符串或字符串数组)。这就是 Observable 的 Observable,一个高阶 Observable。
但是如何处理高阶 Observable 呢?通常情况下,是通过扁平化:通过(以某种方式)将高阶 Observable 转换为普通 Observable。例如:
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
concatAll(),
);
concatAll() 操作符订阅从“外部”Observable (上例中 urlObservable)发出的每一个“内部” Observable(上例中 http.get() 返回的 Observable) ,并复制这个“内部”Observable 发出的所有值 ,直到这个“内部”Observable 完成,再复制下一个“内部”Observable。所有“内部”Observable 的值都以这种方式被串联,并返回到输出 Observable 中。其它有用的扁平化操作符(称为组合操作符)有:
就像很多数组库结合 map() 和 flat() (或者 flatten()
) 成单一的 flatMap() ,所有 RxJS 的扁平操作符也都有等价的 map 方法 concatMap(), mergeMap(), switchMap(), 和 exhaustMap()。
要解释操作符的工作方式,文字描述通常是不够用的。很多操作符都与时间有关,例如,他们可能以不同的方式延时,采样,节流,或者防抖发出的值。图表通常是一种更好的工具。弹珠图在视觉上表达了操作符如何工作,图上包括了输入的 Observable(s),操作符及其参数,以及输出的 Observable。
在弹珠图中,时间向右流动,图像描述了随着 Observable 的执行,值(“弹珠”)是怎样被发出的。
弹珠图分析如下:
RxJS 官方文档网站中,大量使用了弹珠图来说明操作符怎样工作。其他情况下也可能会用到弹珠图,例如在一个白板上,或者甚至是在我们的单元测试中(像 ASCII 码图)。
由于不同的用途,操作符可以被分类为:创建,转换,过滤,组合,多播,错误处理,工具等。在下面的列表中,你将找到按类别组织好的所有操作符。
完整的概述,见参考页。
这些是 Observable 创建操作符,它们也具有组合功能 — 发出多个源 Observable 的值。
另见上面的组合创建操作符部分。
如果在你的代码中有个常用的操作符序列,请使用 pipe()
函数提取该序列到新的操作符中。即使这一序列不常见,将其变成单个操作符也会提高可读性。
例如,你可以制作一个将奇数值丢弃而将偶数值加倍的函数,如下所示:
import { pipe } from 'rxjs';
import { filter, map } from 'rxjs/operators';
function discardOddDoubleEven() {
return pipe(
filter(v => ! (v % 2)),
map(v => v + v),
);
}
该 pipe()
函数与 Observable 的.pipe()
方法类似,但不相同。
它更复杂,但是如果你必须写一个不能由现有操作符组合出来的操作符(这种情况很少发生),则可以使用 Observable 构造函数从头开始编写操作符,如下所示:
import { Observable } from 'rxjs';
function delay(delayInMillis) {
return (observable) => new Observable(observer => {
// this function will called each time this
// Observable is subscribed to.
const allTimerIDs = new Set();
const subscription = observable.subscribe({
next(value) {
const timerID = setTimeout(() => {
observer.next(value);
allTimerIDs.delete(timerID);
}, delayInMillis);
allTimerIDs.add(timerID);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
}
});
// the return value is the teardown function,
// which will be invoked when the new
// Observable is unsubscribed from.
return () => {
subscription.unsubscribe();
allTimerIDs.forEach(timerID => {
clearTimeout(timerID);
});
}
});
}
你必须注意:
1.当订阅输入 Observable 的时候,三种观察者函数都要实现:next()
, error()
,和 complete()
。
2.当 Observable 完成的时候,要实现一个有清除功能的“清场”函数(在此例中,通过取消订阅和清除所有未完成的定时器)。
3.从接收 Observable 构造函数的函数中返回该清场函数。
当然,这仅是示例;delay()
操作符已经存在了。