-
-
Notifications
You must be signed in to change notification settings - Fork 21
/
mergeHigherOrderArray.ts
79 lines (76 loc) · 2.5 KB
/
mergeHigherOrderArray.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/
/*tslint:disable:rxjs-no-nested-subscribe*/
import { Observable, OperatorFunction, Subscription } from "rxjs";
interface Source<T> {
completed: boolean;
observable: Observable<T>;
subscription?: Subscription;
}
export function mergeHigherOrderArray<T>(): OperatorFunction<
Observable<T>[],
T
> {
return (higherOrder) =>
new Observable<T>((observer) => {
let lasts: Source<T>[] = [];
let nexts: Source<T>[] = [];
let higherOrderCompleted = false;
const higherOrderSubscription = new Subscription();
higherOrderSubscription.add(
higherOrder.subscribe(
(observables) => {
const subscribes: (() => void)[] = [];
nexts = observables.map((observable) => {
const index = lasts.findIndex(
(last) => last.observable === observable
);
if (index !== -1) {
const next = lasts[index];
lasts.splice(index, 1);
return next;
}
const next: Source<T> = { completed: false, observable };
subscribes.push(() => {
if (higherOrderSubscription.closed) {
return;
}
next.subscription = next.observable.subscribe(
(value) => observer.next(value),
(error) => observer.error(error),
() => {
next.completed = true;
if (
higherOrderCompleted &&
nexts.every(({ completed }) => completed)
) {
observer.complete();
}
}
);
higherOrderSubscription.add(next.subscription);
});
return next;
});
lasts.forEach(({ subscription }) => {
if (subscription) {
subscription.unsubscribe();
}
});
lasts = nexts;
subscribes.forEach((subscribe) => subscribe());
},
(error) => observer.error(error),
() => {
if (lasts.every(({ completed }) => completed)) {
observer.complete();
}
higherOrderCompleted = true;
}
)
);
return higherOrderSubscription;
});
}