在RxJS中,一个数据流的完整流向至少需要包含Observable和Observer。Observable是被观察者,Observer是观察者,Observer订阅Observable,Observable向Observer推送数据以完成整个过程。
成都创新互联公司是专业的榆中网站建设公司,榆中接单;提供成都网站设计、网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行榆中网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!可以说一个完整的RxJS数据流就是Observable和Observer之间的互动游戏。
Observable实现了下面两种设计模式:
观察者模式
迭代器模式
由于不是写设计模式的文章,所以一笔带过,感兴趣的可以自己看下相关的书,需要一提的是,任何一种设计模式,指的都是解决某个特定类型问题的方法。问题复杂多变,往往不是靠单独一种设计模式能够解决的,更需要多种设计模式的组合,而RxJS的Observable就是观察者模式和迭代器模式的组合。
Observale与Observer的互动
订阅
首先我们先创建一个Observable,作为数据源
const { Observable } = rxjs; const sourceObservable$ = new Observable(observer => {
observer.next('hello');
observer.next('rxjs');
});
然后再设置一个observer,作为订阅者
const Observer = { next: res => console.log(res)
};
最后Observer订阅Observable,数据开始流动
sourceObservable$.subscribe(Observer); // hello
// rxjs
一个非常简单的数据流就完成了。
状态流转
在上面的例子中,只有next这一种状态,不断地往下游传递数据,但实际上,数据流在流动过程中有三种状态:
next,正常流转到下一个状态
error,捕获到异常,流动中止
complete,数据流已经完成,流动终止
接下来改一下上面的代码,补上另外两种状态
const sourceObservable$ = new Observable(observer => {
observer.next('hello');
observer.error('the data is wrong!');
observer.next('rxjs');
observer.complete();
}); const Observer = { next: res => console.log(res), error: err => console.log(err), complete: () => console.log('complete!')
}
sourceObservable$.subscribe(Observer); // hello
// the data is wrong!
现在再看Observer是不是有种似曾相识的感觉?没错,Observer其实就是一个迭代器了。
至此,不难发现,Observable与Observer的互动过程其实就是,Observer通过观察者模式向Observable注入了一个迭代器,通过next游标控制数据源Observable的数据流动,并根据实际流动过程中可能发生的情况将状态流转到error或者complete。
取消订阅
现在Observable与Observer已经通过subscribe建立了联系,但有时候我们需要把这种联系断开,比如组件销毁的时候。这时候就需要取消订阅了,看下面的例子
const sourceObservable$ = new Observable(observer => { let number = 1;
setInterval(() => {
observer.next(number++);
}, 1000);
}); const Observer = { next: res => console.log(res), error: err => console.log(err), complete: () => console.log('complete!')
} const subscription = sourceObservable$.subscribe(Observer);
setTimeout(() => { // 取消订阅
subscription.unsubscribe();
}, 4000); // 1
// 2
// 3
// 4
需要注意的是,在本例子中,虽然取消订阅了,但是作为数据源的sourceObservable$并没有终结,因为始终没有调用complete方法,只是Observer不再接收推送的数据了
为了便于观察其中的差异,我们将sourceObservable$改一下
const sourceObservable$ = new Observable(observer => { let number = 1;
setInterval(() => { console.log('subscribe:' + number);
observer.next(number++);
}, 1000);
}); // subscribe: 1
// 1
// subscribe: 2
// 2
// subscribe: 3
// 3
// subscribe: 4
// 4
// subscribe: 5
// subscribe: 6
// ...
Hot Observable和Cold Observable
假设我们有这样一个场景,一个Observable被两个ObserverA、ObserverB先后相隔N秒订阅了,那么ObserverB是否需要接收订阅之前的数据呢?
其实没有固定的答案,是否接收需要根据实际的业务场景来定,正是因为如此,所以便有了Hot Observable以及Cold Observable。
Hot Observable:热被观察对象,类似于直播,看到的内容是从你打开直播的那一刻开始的,之前的内容已经错过。只能接收订阅那一刻开始的数据。
Cold Observable:冷被观察对象,类似于录播,看到的内容是你打开的视频的第一秒种开始的。每次订阅都会从头开始接收数据
先看下Hot Observable,每次订阅只推送当前的数据,所以不会在每次订阅时重置数据推送,代码如下:
// 先产生数据
let number = 1; const sourceObservale$ = new Observable(observer => { let num = number;
setInterval(() => {
observer.next(num++);
number = num;
}, 1000);
}); const ObserverA = ObserverB = { next: item => console.log(item), error: err => console.log(err), complete: () => console.log('complete!')
}
sourceObservale$.subscribe(ObserverA);
setTimeout(() => {
sourceObservale$.subscribe(ObserverB);
}, 2000); // 1 => A
// 2 => A
// 3 => A
// 3 => B
// 4 => A
// 4 => B
// ..
而对于Cold Observable,每次订阅都是重头开始推送,所以每一次订阅都会重置数据推送,代码如下:
const sourceObservale$ = new Observable(observer => { // 订阅时产生数据
let number = 1;
setInterval(() => {
observer.next(number++);
}, 1000);
}); // 中间不变
... // 1 => A
// 2 => A
// 3 => A
// 1 => B
// 4 => A
// 2 => B
// ..
从这里也可以看出,Observable是具有惰性求值的,只有在被订阅的时候才会执行内部逻辑,而Cold Observable则更进一步,在没有被订阅的时候,连数据都不会产生。
睿江云官网链接:http://www.eflycloud.com/#register?salesID=6DGNUTUAV
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
网页名称:rxjs系列--Observale与Observer-创新互联
分享地址:http://scgulin.cn/article/hpchs.html