Reduce

apply a function to each item emitted by an Observable, sequentially, and emit the final value
Open interactive diagram on rxmarbles.com

The Reduce operator applies a function to the first item emitted by the source Observable and then feeds the result of the function back into the function along with the second item emitted by the source Observable, continuing this process until the source Observable emits its final item and completes, whereupon the Observable returned from Reduce emits the final value returned from the function.

This sort of operation is sometimes called “accumulate,” “aggregate,” “compress,” “fold,” or “inject” in other contexts.

See Also

Language-Specific Information

reduce

The reduce operator returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. Then it emits the final result from the final call to your function as the sole output from the returned Observable.

Note that if the source Observable does not emit any items, reduce will fail with an IllegalArgumentException.

For example, the following code uses reduce to compute, and then emit as an Observable, the sum of the numbers emitted by the source Observable:

Sample Code

numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.reduce({ a, b -> a+b }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
15
Sequence complete

reduce does not by default operate on any particular Scheduler.

reduce

There is also a version of reduce to which you can pass a seed item in addition to an accumulator function. Note that passing a null seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null, you will be seeding your reduction with the item null. Note also that if you do pass in a seed, and the source Observable emits no items, reduce will emit the seed and complete normally without error.

reduce does not by default operate on any particular Scheduler.

It is a bad idea to use reduce to collect emitted items into a mutable data structure. Instead, use collect for that purpose.

collect

The collect operator is similar to reduce but is specialized for the purpose of collecting the whole set of items emitted by the source Observable into a single mutable data structure to be emitted by the resulting Observable. Pass it two parameters:

  1. a function that returns the mutable data structure
  2. a function that, when given the data structure and an item emitted by the source Observable, modifies the data structure appropriately

collect does not by default operate on any particular Scheduler.

reduce

The reduce operator returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. Then it emits the final result from the final call to your function as the sole output from the returned Observable.

Note that if the source Observable does not emit any items, reduce will fail with an IllegalArgumentException.

reduce does not by default operate on any particular Scheduler.

reduce

There is also a version of reduce to which you can pass a seed item in addition to an accumulator function. Note that passing a null seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null, you will be seeding your reduction with the item null. Note also that if you do pass in a seed, and the source Observable emits no items, reduce will emit the seed and complete normally without error.

reduce does not by default operate on any particular Scheduler.

It is a bad idea to use reduce to collect emitted items into a mutable data structure. Instead, use collect for that purpose.

collect

The collect operator is similar to reduce but is specialized for the purpose of collecting the whole set of items emitted by the source Observable into a single mutable data structure to be emitted by the resulting Observable. Pass it two parameters:

  1. a function that returns the mutable data structure
  2. a function that, when given the data structure and an item emitted by the source Observable, modifies the data structure appropriately

collect does not by default operate on any particular Scheduler.

reduce

RxJS implements the reduce operator. Pass it an accumulator function, and, optionally, a seed value to pass into the accumulator function with the first item emitted by the source Observable.

Sample Code

var source = Rx.Observable.range(1, 3)
    .reduce(function (acc, x) {
        return acc * x;
    }, 1)

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 6
Completed

reduce is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.aggregates.js

reduce requires one of the following distributions:

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as reduce.

Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/reduce/reduce.php

//Without a seed
$source = \Rx\Observable::fromArray(range(1, 3));

$subscription = $source
    ->reduce(function ($acc, $x) {
        return $acc + $x;
    })
    ->subscribe($createStdoutObserver());
Next value: 6
Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/reduce/reduce-with-seed.php

//With a seed
$source = \Rx\Observable::fromArray(range(1, 3));

$subscription = $source
    ->reduce(function ($acc, $x) {
        return $acc * $x;
    }, 1)
    ->subscribe($createStdoutObserver());
Next value: 6
Complete!

© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/reduce.html