RxJS – Custom Operator

前言

虽然 RxJS 提供了非常多的 Operators. 但依然会有不够用的时候. 这时就可以自定义 Operator 了.

 

Operator Is Just a Function Observable => Observable

Operator 只是一个函数.

timer(1000).pipe(obs => obs).subscribe();

它接收一个 Observable 返回一个 Observable.

Operator 都是放在管道 (pipe) 内的, 所以从源头 Observable 一直传递下去.

每一个 Operator 接收 upstream(上游) 的 Observable 并且返回一个 Observable (通常是新的一个) 给 downstream(下游).

每一个 Operator 就是对发布值的加工处理.

 

Standard Operator

source 源头

首先我们有个源头

const source = timer(1000);

每一秒发布一次

Upstream / Downstream Observable

接着搞一个 pipe 和 operator.

const source = timer(1000);
source
  .pipe(upstreamObs => {
    console.log(upstreamObs === source); // true

    const downstreamObs = new Observable();
    return downstreamObs;
  })
  .subscribe(v => console.log(v));

可以看到 operator 函数接收的就是 upstream 的 Observable, 然后返回一个新的 Observable 到 downstream.

Connect Upstream and Downstream

downstream to upstream

当 downstream 被 subscribe, 我们 subscribe upstream

当 downstream 被 unsubscribe, 我们 unsubscribe upstream

const source = timer(1000);
source
  .pipe(upstreamObs => {
    const downstreamObs = new Observable(subscriber => {

      // 当 downstream Observable 被订阅以后, 我们才订阅 upstream Observable
      const upstreamSub = upstreamObs.subscribe();

      return () => {
        // 当 downstream 被退订以后, 我们也要退订 upstream Observable
        upstreamSub.unsubscribe();
      };
      
    });

    return downstreamObs;
  })
  .subscribe(v => console.log(v));

upstream to downstream

当接收到 upstream 发布时, 我们也发布到 downstream

const downstreamObs = new Observable(subscriber => {
  const upstreamSub = upstreamObs.subscribe(upstreamValue => {
    const downStreamValue = upstreamValue + 'downstream value'; // decorate value for downstream
    subscriber.next(downStreamValue); // 发布到 downstream
  });

  return () => {
    upstreamSub.unsubscribe();
  };
});

小结

从上面几招可以看出来, 主要就是 upstream 和 downstream 中间的关系.

如何订阅, 退订, 发布 value 等等. 上面只是给一个简单直观的例子. 真实场景中, upstream 和 downstream 发布时机, 往往是不一致的 (比如 upstream 发布 1 次后, downstream 可能发布多次)

Operator with Config

只要把 Operator 包装成工厂函数就可以支持 Config 了

function myOperator(config: string): OperatorFunction<number, string> {
  console.log(config);

  return upstreamObs => {
    const downstreamObs = new Observable<string>(subscriber => {
      const upstreamSub = upstreamObs.subscribe(upstreamValue => {
        const downStreamValue = upstreamValue + 'downstream value'; // decorate value for downstream
        subscriber.next(downStreamValue); // 发布到 downstream
      });

      return () => {
        upstreamSub.unsubscribe();
      };
    });

    return downstreamObs;
  };
}
const source = timer(1000);
source.pipe(myOperator('some config here...')).subscribe(v => console.log(v));

一个 Factory 函数, 通过配置生产 Operator.

OperatorFunction 定义了 Operator 接口, 它是加入了泛型的 Observable => Observable.

如果 upstream 和 downstream 的类型是一样的话可以用它的简化版本 MonoTypeOperatorFunction<T>

 

原文地址:http://www.cnblogs.com/keatkeat/p/16851630.html

发表评论

您的电子邮箱地址不会被公开。