前言

虽然 RxJS 提供了非常多的 Operators. 但依然会有不够用的时候. 这时就可以自定义 Operator 了.

 

Operator Is Just a Function Observable => Observable

Operator 只是一个函数.

timer(1000).pipe(obs => obs).subscribe();

它接收一个 Observable 返回一个 Observable.

Operator 都是放在管道 (pipe) 内的, 所以从源头 Observable 一直传递下去.

每一个 Operator 接收 upstream(上游) 的 Observable 并且返回一个 Observable (通常是新的一个) 给 downstream(下游).

每一个 Operator 就是对发布值的加工处理.

 

Standard Operator

source 源头

首先我们有个源头

const source = timer(1000);

每一秒发布一次

Upstream / Downstream Observable

接着搞一个 pipe 和 operator.

const source = timer(1000);
source
  .pipe(upstreamObs => {
    console.log(upstreamObs === source); // true

    const downstreamObs = new Observable();
    return downstreamObs;
  })
  .subscribe(v => console.log(v));

可以看到 operator 函数接收的就是 upstream 的 Observable, 然后返回一个新的 Observable 到 downstream.

Connect Upstream and Downstream

downstream to upstream

当 downstream 被 subscribe, 我们 subscribe upstream

当 downstream 被 unsubscribe, 我们 unsubscribe upstream

const source = timer(1000);
source
  .pipe(upstreamObs => {
    const downstreamObs = new Observable(subscriber => {

      // 当 downstream Observable 被订阅以后, 我们才订阅 upstream Observable
      const upstreamSub = upstreamObs.subscribe();

      return () => {
        // 当 downstream 被退订以后, 我们也要退订 upstream Observable
        upstreamSub.unsubscribe();
      };
      
    });

    return downstreamObs;
  })
  .subscribe(v => console.log(v));

upstream to downstream

当接收到 upstream 发布时, 我们也发布到 downstream

const downstreamObs = new Observable(subscriber => {
  const upstreamSub = upstreamObs.subscribe(upstreamValue => {
    const downStreamValue = upstreamValue + 'downstream value'; // decorate value for downstream
    subscriber.next(downStreamValue); // 发布到 downstream
  });

  return () => {
    upstreamSub.unsubscribe();
  };
});

小结

从上面几招可以看出来, 主要就是 upstream 和 downstream 中间的关系.

如何订阅, 退订, 发布 value 等等. 上面只是给一个简单直观的例子. 真实场景中, upstream 和 downstream 发布时机, 往往是不一致的 (比如 upstream 发布 1 次后, downstream 可能发布多次)

Operator with Config

只要把 Operator 包装成工厂函数就可以支持 Config 了

function myOperator(config: string): OperatorFunction<number, string> {
  console.log(config);

  return upstreamObs => {
    const downstreamObs = new Observable<string>(subscriber => {
      const upstreamSub = upstreamObs.subscribe(upstreamValue => {
        const downStreamValue = upstreamValue + 'downstream value'; // decorate value for downstream
        subscriber.next(downStreamValue); // 发布到 downstream
      });

      return () => {
        upstreamSub.unsubscribe();
      };
    });

    return downstreamObs;
  };
}
const source = timer(1000);
source.pipe(myOperator('some config here...')).subscribe(v => console.log(v));

一个 Factory 函数, 通过配置生产 Operator.

OperatorFunction 定义了 Operator 接口, 它是加入了泛型的 Observable => Observable.

如果 upstream 和 downstream 的类型是一样的话可以用它的简化版本 MonoTypeOperatorFunction<T>

 

原文地址:http://www.cnblogs.com/keatkeat/p/16851630.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性