mergeScan

function stable

Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.

mergeScan<T, R>(accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, seed: R, concurrent: number = Infinity): OperatorFunction<T, R>

Parameters

accumulator

The accumulator function called on each source value.

seed

The initial accumulation value.

concurrent

Optional. Default is Infinity.

Maximum number of input Observables being subscribed to concurrently.

Returns

OperatorFunction<T, R>: A function that returns an Observable of the accumulated values.

Description

It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.

The first parameter of the mergeScan is an accumulator function which is being called every time the source Observable emits a value. mergeScan will subscribe to the value returned by the accumulator function and will emit values to the subscriber emitted by inner Observable.

The accumulator function is being called with three parameters passed to it: acc, value and index. The acc parameter is used as the state parameter whose value is initially set to the seed parameter (the second parameter passed to the mergeScan operator).

mergeScan internally keeps the value of the acc parameter: as long as the source Observable emits without inner Observable emitting, the acc will be set to seed. The next time the inner Observable emits a value, mergeScan will internally remember it and it will be passed to the accumulator function as acc parameter the next time source emits.

The value parameter of the accumulator function is the value emitted by the source Observable, while the index is a number which represent the order of the current emission by the source Observable. It starts with 0.

The last parameter to the mergeScan is the concurrent value which defaults to Infinity. It represent the maximum number of inner Observable subscriptions at a time.

Example

Count the number of click events

import { fromEvent, of } from 'rxjs';
import { mapTo, mergeScan } from 'rxjs/operators';

const click$ = fromEvent(document, 'click');
const one$ = click$.pipe(mapTo(1));
const seed = 0;
const count$ = one$.pipe(
  mergeScan((acc, one) => of(acc + one), seed),
);
count$.subscribe(x => console.log(x));

// Results:
// 1
// 2
// 3
// 4
// ...and so on for each click

See Also

© 2015–2021 Google, Inc., Netflix, Inc., Microsoft Corp. and contributors.
Code licensed under an Apache-2.0 License. Documentation licensed under CC BY 4.0.
https://rxjs.dev/api/operators/mergeScan