RxJS - Utility Operators

RxJS - Utility Operators

Welcome back Guys, Today I'll show you some utility operators. Let's start :)

Used to perform side-effects for notifications from the source observable

import { of } from 'rxjs'
import { tap } from 'rxjs/operators'


of('a', 'b', 'c')
  .pipe(
      tap({
          next: x => console.log(`tap: ${x}`),
          complete: () => console.log('tap: complete'),
          error: err => console.log(`tap: error: ${err}`)
      })
  )
  .subscribe()
tap: a
tap: b
tap: c
tap: complete

This operator helps us to get some info about the observable during its execution. This operator is equal to the subscriber and it uses three methods to get info: next, complete, and error. What could happen in these three methods? Exactly what you want :) You can run a side-effect or log the values. Usually I prefer logging info in this operator and not run side effects because at times side effects are difficult to test.

Delays the emission of items from the source Observable by a given timeout or until a given Date.

import { of } from 'rxjs'
import { delay, tap } from 'rxjs/operators'


of('a', 'b', 'c')
  .pipe(
      tap(x => console.log(`${new Date().toLocaleTimeString()} tap before delay: ${x}`)),
      delay(1000),
      tap(x => console.log(`${new Date().toLocaleTimeString()} tap after delay: ${x}`)),
  )
  .subscribe()
17:08:26 tap before delay: a
17:08:26 tap before delay: b
17:08:26 tap before delay: c
17:08:27 tap after delay: a
17:08:27 tap after delay: b
17:08:27 tap after delay: c

delay Marble Diagram This operator is used to wait some specific time before emitting the value.

Attaches a timestamp to each item emitted by an observable indicating when it was emitted

import { of } from 'rxjs'
import { timestamp } from 'rxjs/operators'

of('a', 'b', 'c')
  .pipe(
      timestamp()
  )
  .subscribe(console.log)
{ value: 'a', timestamp: 1629385746523 }
{ value: 'b', timestamp: 1629385746528 }
{ value: 'c', timestamp: 1629385746528 }

This operator is used to attach a timestamp to each emitted item. The timestamp indicates the time when the value was emitted. This operator can be helpful during debugging or if we need to have info about the time emission of the value.

Errors if Observable does not emit a value in given time span.

import { Observable, throwError } from 'rxjs';
import { timeout } from 'rxjs/operators';

const source$ = new Observable<number>(subscriber => {
    let id: NodeJS.Timeout | undefined
    (function next(count = 0) {
        if (count > 10) {
            subscriber.complete();
            return;
        }
        id = setTimeout(() => {
            subscriber.next(count)
            next(count + 1)
        }, Math.random() > 0.9 ? 2000 : 1000);
    })()

    return () => {
        if (id) {
            clearTimeout(id);
        }
    }

})

source$
  .pipe(
      timeout({ each: 1500, with: info => throwError(() => new Error(`Timeout ${1500}ms: info: ${JSON.stringify(info)}`)) }),
  )
  .subscribe({
      next: console.log,
      error: error => {
          console.error(`Something Wrong!`)
          console.error(error.message)
      }
  })
0
1
2
3
4
5
6
7
8
Something Wrong!
Timeout 1500ms: info: {"meta":null,"lastValue":null,"seen":9}

This operator checks the time of the execution of the observable, if the value is not emitted within the timeout time the operator throws an error.

Collects all source emissions and emits them as an array when the source completes.

import { of } from 'rxjs'
import { toArray } from 'rxjs/operators'


of('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h')
  .pipe(
      toArray()
  )
  .subscribe(console.log)
[
  'a', 'b', 'c',
  'd', 'e', 'f',
  'g', 'h'
]

toArray Marble Diagram This operator converts a sequence of values in one array, where all the array's items are the emitted values in sequence.

Ok Guys, from the Utilities Operators is all. Here you can find all the code of this article.

See you soon, Bye Bye!