Observable

class stable

A representation of any set of values over any amount of time. This is the most basic building block of RxJS.

class Observable<T> implements Subscribable {
  static create: (...args: any[]) => any
  constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic)
  source: Observable<any> | undefined
  operator: Operator<any, T> | undefined
  lift<R>(operator?: Operator<T, R>): Observable<R>
  subscribe(observerOrNext?: Partial<Observer<T>> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Subscription
  forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise<void>
  pipe(...operations: OperatorFunction<any, any>[]): Observable<any>
  toPromise(promiseCtor?: PromiseConstructorLike): Promise<T | undefined>
}

Subclasses

  • ConnectableObservable
  • Subject
    • BehaviorSubject
    • ReplaySubject
    • AsyncSubject

Static Properties

Property Type Description
create (...args: any[]) => any

Creates a new Observable by calling the Observable constructor

Constructor

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic)

Parameters

subscribe

Optional. Default is undefined.

the function that is called when the Observable is initially subscribed to. This function is given a Subscriber, to which new values can be nexted, or an error method can be called to raise an error, or complete can be called to notify of a successful completion.

Properties

Property Type Description
source Observable<any> | undefined
operator Operator<any, T> | undefined

Methods

lift<R>(operator?: Operator<T, R>): Observable<R>

Creates a new Observable, with this Observable instance as the source, and the passed operator defined as the new observable's operator.

Deprecation Notes

Internal implementation detail, do not use directly. Will be made internal in v8. If you have implemented an operator using lift, it is recommended that you create an operator by simply returning new Observable() directly. See "Creating new operators from scratch" section here: https://rxjs.dev/guide/operators

Parameters

operator

Optional. Default is undefined.

the operator defining the operation to take on the observable

Returns

Observable<R>: a new observable with the Operator applied

subscribe(observer?: Partial<Observer<T>>): Subscription

Invokes an execution of an Observable and registers Observer handlers for notifications it will emit.

Parameters

observer

Optional. Default is undefined.

Type: Partial>.

Returns

Subscription

subscribe(next: (value: T) => void): Subscription

Parameters

next

Type: (value: T) => void.

Returns

Subscription

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription

Deprecation Notes

Instead of passing separate callback arguments, use an observer argument. Signatures taking separate callback arguments will be removed in v8. Details: https://rxjs.dev/deprecations/subscribe-arguments

Parameters

next

Optional. Default is undefined.

Type: (value: T) => void.

error

Optional. Default is undefined.

Type: (error: any) => void.

complete

Optional. Default is undefined.

Type: () => void.

Returns

Subscription

Use it when you have all these Observables, but still nothing is happening.

subscribe is not a regular operator, but a method that calls Observable's internal subscribe function. It might be for example a function that you passed to Observable's constructor, but most of the time it is a library implementation, which defines what will be emitted by an Observable, and when it be will emitted. This means that calling subscribe is actually the moment when Observable starts its work, not when it is created, as it is often the thought.

Apart from starting the execution of an Observable, this method allows you to listen for values that an Observable emits, as well as for when it completes or errors. You can achieve this in two of the following ways.

The first way is creating an object that implements Observer interface. It should have methods defined by that interface, but note that it should be just a regular JavaScript object, which you can create yourself in any way you want (ES6 class, classic function constructor, object literal etc.). In particular, do not attempt to use any RxJS implementation details to create Observers - you don't need them. Remember also that your object does not have to implement all methods. If you find yourself creating a method that doesn't do anything, you can simply omit it. Note however, if the error method is not provided and an error happens, it will be thrown asynchronously. Errors thrown asynchronously cannot be caught using try/catch. Instead, use the onUnhandledError configuration option or use a runtime handler (like window.onerror or process.on('error)) to be notified of unhandled errors. Because of this, it's recommended that you provide an error method to avoid missing thrown errors.

The second way is to give up on Observer object altogether and simply provide callback functions in place of its methods. This means you can provide three functions as arguments to subscribe, where the first function is equivalent of a next method, the second of an error method and the third of a complete method. Just as in case of an Observer, if you do not need to listen for something, you can omit a function by passing undefined or null, since subscribe recognizes these functions by where they were placed in function call. When it comes to the error function, as with an Observer, if not provided, errors emitted by an Observable will be thrown asynchronously.

You can, however, subscribe with no parameters at all. This may be the case where you're not interested in terminal events and you also handled emissions internally by using operators (e.g. using tap).

Whichever style of calling subscribe you use, in both cases it returns a Subscription object. This object allows you to call unsubscribe on it, which in turn will stop the work that an Observable does and will clean up all resources that an Observable used. Note that cancelling a subscription will not call complete callback provided to subscribe function, which is reserved for a regular completion signal that comes from an Observable.

Remember that callbacks provided to subscribe are not guaranteed to be called asynchronously. It is an Observable itself that decides when these functions will be called. For example of by default emits all its values synchronously. Always check documentation for how given Observable will behave when subscribed and if its default behavior can be modified with a scheduler.

Example

Subscribe with an Observer

import { of } from 'rxjs';

const sumObserver = {
  sum: 0,
  next(value) {
    console.log('Adding: ' + value);
    this.sum = this.sum + value;
  },
  error() {
    // We actually could just remove this method,
    // since we do not really care about errors right now.
  },
  complete() {
    console.log('Sum equals: ' + this.sum);
  }
};

of(1, 2, 3) // Synchronously emits 1, 2, 3 and then completes.
  .subscribe(sumObserver);

// Logs:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"

Subscribe with functions

import { of } from 'rxjs'

let sum = 0;

of(1, 2, 3).subscribe(
  value => {
    console.log('Adding: ' + value);
    sum = sum + value;
  },
  undefined,
  () => console.log('Sum equals: ' + sum)
);

// Logs:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"

Cancel a subscription

import { interval } from 'rxjs';

const subscription = interval(1000).subscribe({
  next(num) {
    console.log(num)
  },
  complete() {
    // Will not be called, even when cancelling subscription.
    console.log('completed!');
  }
});

setTimeout(() => {
  subscription.unsubscribe();
  console.log('unsubscribed!');
}, 2500);

// Logs:
// 0 after 1s
// 1 after 2s
// "unsubscribed!" after 2.5s

forEach(next: (value: T) => void): Promise<void>

Used as a NON-CANCELLABLE means of subscribing to an observable, for use with APIs that expect promises, like async/await. You cannot unsubscribe from this.

Parameters

next

a handler for each value emitted by the observable

Returns

Promise<void>: a promise that either resolves on observable completion or rejects with the handled error

forEach(next: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise<void>

Deprecation Notes

Passing a Promise constructor will no longer be available in upcoming versions of RxJS. This is because it adds weight to the library, for very little benefit. If you need this functionality, it is recommended that you either polyfill Promise, or you create an adapter to convert the returned native promise to whatever promise implementation you wanted. Will be removed in v8.

Parameters

next

a handler for each value emitted by the observable

promiseCtor

a constructor function used to instantiate the Promise

Returns

Promise<void>: a promise that either resolves on observable completion or rejects with the handled error

WARNING: Only use this with observables you know will complete. If the source observable does not complete, you will end up with a promise that is hung up, and potentially all of the state of an async function hanging out in memory. To avoid this situation, look into adding something like timeout, take, takeWhile, or takeUntil amongst others.

Example:

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(4));

async function getTotal() {
   let total = 0;

   await source$.forEach(value => {
     total += value;
     console.log('observable -> ', value);
   });

   return total;
}

getTotal().then(
   total => console.log('Total:', total)
)

// Expected:
// "observable -> 0"
// "observable -> 1"
// "observable -> 2"
// "observable -> 3"
// "Total: 6"

pipe(): Observable<T>

Used to stitch together functional operators into a chain.

Parameters

There are no parameters.

Returns

Observable<T>

pipe<A>(op1: OperatorFunction<T, A>): Observable<A>

Parameters

op1

Type: OperatorFunction.

Returns

Observable<A>

pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

Returns

Observable<B>

pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

Returns

Observable<C>

pipe<A, B, C, D>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>): Observable<D>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

op4

Type: OperatorFunction.

Returns

Observable<D>

pipe<A, B, C, D, E>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>): Observable<E>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

op4

Type: OperatorFunction.

op5

Type: OperatorFunction.

Returns

Observable<E>

pipe<A, B, C, D, E, F>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>): Observable<F>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

op4

Type: OperatorFunction.

op5

Type: OperatorFunction.

op6

Type: OperatorFunction.

Returns

Observable<F>

pipe<A, B, C, D, E, F, G>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>): Observable<G>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

op4

Type: OperatorFunction.

op5

Type: OperatorFunction.

op6

Type: OperatorFunction.

op7

Type: OperatorFunction.

Returns

Observable<G>

pipe<A, B, C, D, E, F, G, H>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>): Observable<H>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

op4

Type: OperatorFunction.

op5

Type: OperatorFunction.

op6

Type: OperatorFunction.

op7

Type: OperatorFunction.

op8

Type: OperatorFunction.

Returns

Observable<H>

pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

op4

Type: OperatorFunction.

op5

Type: OperatorFunction.

op6

Type: OperatorFunction.

op7

Type: OperatorFunction.

op8

Type: OperatorFunction.

op9

Type: OperatorFunction.

Returns

Observable<I>

pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>, ...operations: OperatorFunction<any, any>[]): Observable<unknown>

Parameters

op1

Type: OperatorFunction.

op2

Type: OperatorFunction.

op3

Type: OperatorFunction.

op4

Type: OperatorFunction.

op5

Type: OperatorFunction.

op6

Type: OperatorFunction.

op7

Type: OperatorFunction.

op8

Type: OperatorFunction.

op9

Type: OperatorFunction.

operations

Type: OperatorFunction[].

Returns

Observable<unknown>

toPromise(): Promise<T | undefined>

Subscribe to this Observable and get a Promise resolving on complete with the last emission (if any).

Deprecation Notes

Replaced with firstValueFrom and lastValueFrom. Will be removed in v8. Details: https://rxjs.dev/deprecations/to-promise

Parameters

There are no parameters.

Returns

Promise<T | undefined>

toPromise(PromiseCtor: any): Promise<T | undefined>

Deprecation Notes

Replaced with firstValueFrom and lastValueFrom. Will be removed in v8. Details: https://rxjs.dev/deprecations/to-promise

Parameters

PromiseCtor

Type: any.

Returns

Promise<T | undefined>

toPromise(PromiseCtor: PromiseConstructorLike): Promise<T | undefined>

Deprecation Notes

Replaced with firstValueFrom and lastValueFrom. Will be removed in v8. Details: https://rxjs.dev/deprecations/to-promise

Parameters

PromiseCtor

Type: PromiseConstructorLike.

Returns

Promise<T | undefined>

WARNING: Only use this with observables you know will complete. If the source observable does not complete, you will end up with a promise that is hung up, and potentially all of the state of an async function hanging out in memory. To avoid this situation, look into adding something like timeout, take, takeWhile, or takeUntil amongst others.

© 2015–2021 Google, Inc., Netflix, Inc., Microsoft Corp. and contributors.
Code licensed under an Apache-2.0 License. Documentation licensed under CC BY 4.0.
https://rxjs.dev/api/index/class/Observable