RxJS速成 (下)
Subject
Subject比较特殊, 它即是Observable又是Observer.
作为Observable, Subject是比较特殊的, 它可以对多个Observer进行广播, 而普通的Observable只能单播, 它有点像EventEmitters(事件发射器), 维护着多个注册的Listeners.
作为Observable, 你可以去订阅它, 提供一个Observer就会正常的收到推送的值. 从Observer的角度是无法分辨出这个Observable是单播的还是一个Subject.
从Subject内部来讲, subscribe动作并没有调用一个新的执行来传递值, 它只是把Observer注册到一个列表里, 就像其他库的AddListener一样.
作为Observer, 它是一个拥有next(), error(), complete()方法的对象, 调用next(value)就会为Subject提供一个新的值, 然后就会多播到注册到这个Subject的Observers.
例子 subject.ts:
import { Subject } from "rxjs/Subject";
const subject = new Subject();
const subscriber1 = subject.subscribe({
next: (v) => console.log(`observer1: ${v}`)
});
const subscriber2 = subject.subscribe({
next: (v) => console.log(`observer2: ${v}`)
});
subject.next(1);
subscriber2.unsubscribe();
subject.next(2);
const subscriber3 = subject.subscribe({
next: (v) => console.log(`observer3: ${v}`)
});
subject.next(3);
订阅者1,2从开始就订阅了subject. 然后subject推送值1的时候, 它们都收到了.
然后订阅者2, 取消了订阅, 随后subject推送值2, 只有订阅者1收到了.
后来订阅者3也订阅了subject, 然后subject推送了3, 订阅者1,3都收到了这个值.
下面是一个angular 5的例子:
app.component.html:
<h3>从Subject共享Observable到多个Subscribers</h3>
<input type="text" placeholder="start typing" (input)="mySubject.next($event)" (keyup)="mySubject.next($event)">
<br> Subscriber to input events got {{inputValue}}
<br>
<br> Subscriber to keyup events got {{keyValue}}
app.component.ts:
import { Component } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/map';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent {
title = 'app';
keyValue: string;
inputValue: string;
mySubject: Subject<Event> = new Subject();
constructor() {
// subscriber 1
this.mySubject.filter(({ type }) => type === 'keyup')
.map(e => (<KeyboardEvent>e).key)
.subscribe(value => this.keyValue = value);
// subscriber 2
this.mySubject.filter(({ type }) => type === 'input')
.map(e => (<HTMLInputElement>e.target).value)
.subscribe(value => this.inputValue = value);
}
}
input和keyup动作都把event推送到mySubject, 然后mySubject把值推送给订阅者, 订阅者1通过过滤和映射它只处理keyup类型的事件, 而订阅者2只处理input事件.
效果:
BehaviorSubject
BehaviorSubject 是Subject的一个变种, 它有一个当前值的概念, 它会把它上一次发送给订阅者值保存起来, 一旦有新的Observer进行了订阅, 那这个Observer马上就会从BehaviorSubject收到这个当前值.
也可以这样理解BehaviorSubject的特点:
- 它代表一个随时间变化的值, 例如, 生日的流就是Subject, 而一个人的年龄流就是BehaviorSubject.
- 每个订阅者都会从BehaviorSubject那里得到它推送出来的初始值和最新的值.
- 用例: 共享app状态.
例子 behavior-subject.ts:
import { BehaviorSubject } from "rxjs/BehaviorSubject";
const subject = new BehaviorSubject(0);
subject.subscribe({
next: v => console.log(`Observer1: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: v => console.log(`Observer2: ${v}`)
});
subject.next(3);
效果:
常用Operators:
concat
concat: 按顺序合并observables. 只会在前一个observable结束之后才会订阅下一个observable.
它适合用于顺序处理, 例如http请求.
例子:
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/observable/concat';
let firstReq = Observable.timer(3000).mapTo('First Response');
let secondReq = Observable.timer(1000).mapTo('Second Response');
Observable.concat(firstReq, secondReq)
.subscribe(res => console.log(res));
效果:
merge
把多个输入的observable交错的混合成一个observable, 不按顺序.
merge实际上是订阅了每个输入的observable, 它只是把输入的observable的值不带任何转换的发送给输出的Observable. 只有当所有输入的observable都结束了, 输出的observable才会结束. 任何在输入observable传递来的错误都会立即发射到输出的observable, 也就是把整个流都杀死了 .
例子:
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/observable/merge';
let firstReq = Observable.timer(3000).mapTo('First Response');
let secondReq = Observable.timer(1000).mapTo('Second Response');
Observable.merge(firstReq, secondReq)
.subscribe(res => console.log(res));
效果:
mergeMap (原来叫flatMap)
mergeMap把每个输入的Observable的值映射成Observable, 然后把它们混合成一个Observable.
mergeMap可以把嵌套的observables拼合成非嵌套的observable.
它有这些好处:
- 不必编写嵌套的subscribe()
- 把每个observable发出来的值转换成另一个observable
- 自动订阅内部的observable并且把它们(可能)交错的合成一排.
这个还是通过例子来理解比较好:
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/mergeMap';
function getData() {
const students = Observable.from([
{ name: 'Dave', age: 17 },
{ name: 'Nick', age: 18 },
{ name: 'Lee', age: 15 }
]);
const teachers = Observable.from([
{ name: 'Miss Wan', age: 28 },
{ name: 'Mrs Wang', age: 31 },
]);
return Observable.create(
observer => {
observer.next(students);
observer.next(teachers);
}
);
}
getData()
.mergeMap(persons => persons)
.subscribe(
p => console.log(`Subscriber got ${p.name} - ${p.age}`)
);
效果:
switchMap
switchMap把每个值都映射成Observable, 然后使用switch把这些内部的Observables合并成一个.
switchMap有一部分很想mergeMap, 但也仅仅是一部分像而已.
因为它还具有取消的效果, 每次发射的时候, 前一个内部的observable会被取消, 下一个observable会被订阅. 可以把这个理解为切换到一个新的observable上了.
这个还是看marble图比较好理解:
例子:
// 立即发出值, 然后每5秒发出值
const source = Rx.Observable.timer(0, 5000);
// 当 source 发出值时切换到新的内部 observable,发出新的内部 observable 所发出的值
const example = source.switchMap(() => Rx.Observable.interval(500));
// 输出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
const subscribe = example.subscribe(val => console.log(val));
更好的例子是: 网速比较慢的时候, 客户端发送了多次重复的请求, 如果前一次请求在2秒内没有返回的话, 那么就取消前一次请求, 不再需要前一次请求的结果了, 这里就应该使用debounceTime配合switchMap.
mergeMap vs switchMap的例子
mergeMap:
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';
const outer = Observable.interval(1000).take(2);
const combined = outer.mergeMap(x => {
return Observable.interval(400)
.take(3)
.map(y => `outer ${x}: inner ${y}`);
});
combined.subscribe(res => console.log(`result ${res}`));
效果:
switchMap:
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';
const outer = Observable.interval(1000).take(2);
const combined = outer.switchMap(x => {
return Observable.interval(400)
.take(3)
.map(y => `outer ${x}: inner ${y}`);
});
combined.subscribe(res => console.log(`result ${res}`));
zip
zip操作符也会合并多个输入的observables成为一个observable. 多个输入的observable的值, 按顺序, 按索引进行合并, 如果某一个observable在该索引上的值还没有发射值, 那么会等它, 直到所有的输入observables在该索引位置上的值都发射出来, 输出的observable才会发射该索引的值.
例子:
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/zip';
let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);
Observable
.zip(age$,
name$,
isDev$,
(age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
.subscribe(x => console.log(x));
效果:
就不往下写了, 其实看文档就行, 最重要的还是上一部分.
- 10-移动端开发教程-移动端事件
- 08-移动端开发教程-移动端适配方案
- 09-移动端开发教程-Sass入门
- 开发者的如何优雅的使用OSX
- Solidity 智能合约开发语言·数据类型
- 以太坊·Rinkeby 测试网络
- TensorFlow强化学习入门(0)——Q-Learning的查找表实现和神经网络实现
- 【云端架构】网站运维之CDN链接鉴权示例入门(PHP)
- 以太坊·单机多实例演示
- TensorFlow强化学习入门(1)——双臂赌博机
- CTF逆向--.NET与Python篇
- CTF逆向--安卓篇
- hackme.inndy.tw的19道web题解(下)
- hackme.inndy.tw的19道web题解(中)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- gorm聚合查询group结合join和count
- 潘石屹用Python解决100个问题 | 集合
- Catalina 默认使用zsh了,你可习惯
- LeetCode 354 Russian Doll Envelopes (动态规划)
- 设计模式之原型模式
- 设计模式之工厂方法模式
- Python 随机数生成
- OWIN 初探
- Spark和Spring整合处理离线数据
- 宝塔面板API接口抓取教程-宝塔接口配置文件
- 红黑树的创建
- Spark SQL | 目前Spark社区最活跃的组件之一
- 关于 MySQL Repeatable Read Isolation 常见的三个误区
- Spring源码-循环依赖(附25张调试截图)
- 二叉查找树