Angular Observables & RxJS

What are Observables?

An Observable is an object that represents a stream of values over time. Unlike a regular function that returns a single value once, an Observable can deliver zero, one, or many values — either synchronously or asynchronously — and it can signal when the stream is complete or if an error occurs.

A practical analogy: a YouTube channel is like an Observable. The channel (producer) publishes videos over time. Subscribers receive each new video when it is published. Unsubscribing means the subscriber stops receiving notifications. Multiple subscribers can watch the same channel independently.

Angular uses Observables extensively — HTTP requests, form value changes, route parameter changes, and more all return Observables.

What is RxJS?

RxJS (Reactive Extensions for JavaScript) is the library that provides the Observable class along with dozens of operators for transforming, filtering, and combining Observable streams. Angular includes RxJS as a built-in dependency, so no additional installation is needed.

Observable vs Promise

Developers coming from JavaScript backgrounds are familiar with Promises. Observables are more powerful but work differently:


Feature         | Promise                         | Observable
----------------|----------------------------------|----------------------------
Values          | Resolves with one value          | Emits multiple values over time
Execution       | Executes immediately              | Lazy — only runs when subscribed
Cancellation    | Cannot be cancelled               | Can be unsubscribed
Operators       | .then(), .catch()                 | Dozens of RxJS operators
Use in Angular  | Rare                              | Used throughout the framework

Creating and Subscribing to Observables

Creating an Observable


import { Observable } from 'rxjs';

// An Observable that emits three values and completes
const numbers$ = new Observable<number>(subscriber => {
  subscriber.next(10);     // Emit value 10
  subscriber.next(20);     // Emit value 20
  subscriber.next(30);     // Emit value 30
  subscriber.complete();   // Signal completion
});

By convention, Observable variable names end with a dollar sign $. This is not a requirement, but a widely adopted Angular convention that makes Observables easy to identify at a glance.

Subscribing to an Observable

An Observable does nothing until something subscribes to it. The subscribe() method starts the execution and provides three optional callback functions:


numbers$.subscribe({
  next: (value) => console.log('Received:', value),  // Handles each emitted value
  error: (err) => console.error('Error:', err),       // Handles errors
  complete: () => console.log('Stream complete')       // Handles completion
});

// Output:
// Received: 10
// Received: 20
// Received: 30
// Stream complete

Common RxJS Creation Functions

RxJS provides helper functions for creating Observables from common sources:


import { of, from, interval, timer, fromEvent } from 'rxjs';

// of — creates an Observable from a list of values
const fruits$ = of('Apple', 'Banana', 'Cherry');

// from — creates an Observable from an array, promise, or iterable
const colors$ = from(['Red', 'Green', 'Blue']);

// interval — emits an incrementing number every X milliseconds
const ticker$ = interval(1000);  // Emits 0, 1, 2, 3... every second

// timer — emits once after a delay
const delayed$ = timer(3000);  // Emits after 3 seconds

// fromEvent — creates an Observable from DOM events
const clicks$ = fromEvent(document, 'click');

RxJS Operators

Operators are functions that transform the data emitted by an Observable. They are applied using the pipe() method, which chains operators together. The original Observable is not modified — each operator returns a new Observable with the transformation applied.

map — Transform Each Value


import { of } from 'rxjs';
import { map } from 'rxjs/operators';

const prices$ = of(100, 200, 300);

prices$.pipe(
  map(price => price * 0.9)   // Apply 10% discount
).subscribe(discounted => console.log(discounted));

// Output: 90, 180, 270

filter — Keep Only Matching Values


import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

const scores$ = of(45, 72, 88, 30, 95, 60);

scores$.pipe(
  filter(score => score >= 70)   // Keep only passing scores
).subscribe(score => console.log(score));

// Output: 72, 88, 95

tap — Side Effects Without Transforming


import { tap } from 'rxjs/operators';

this.http.get('/api/users').pipe(
  tap(data => console.log('Raw data received:', data)),  // Log without changing
  map(users => users.filter(u => u.active))
).subscribe(activeUsers => this.users = activeUsers);

switchMap — Cancels Previous and Starts New

switchMap is one of the most important operators in Angular. When a new value arrives, it cancels any in-progress inner Observable and starts a new one. This is ideal for search functionality where each keystroke should cancel the previous HTTP request:


import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';
import { fromEvent } from 'rxjs';

// Example: search as user types
searchControl.valueChanges.pipe(
  debounceTime(300),         // Wait 300ms after the user stops typing
  distinctUntilChanged(),    // Only proceed if the value actually changed
  switchMap(query => this.searchService.search(query))  // Cancel previous, start new
).subscribe(results => this.searchResults = results);

mergeMap — Runs All Concurrently


import { mergeMap } from 'rxjs/operators';

// Download all files simultaneously
fileIds$.pipe(
  mergeMap(id => this.fileService.downloadFile(id))  // All requests run at once
).subscribe(file => this.downloadedFiles.push(file));

catchError — Handle Errors


import { catchError, of } from 'rxjs';

this.http.get('/api/data').pipe(
  catchError(error => {
    console.error('Request failed:', error);
    return of([]);   // Return empty array as fallback
  })
).subscribe(data => this.data = data);

takeUntil — Auto-Unsubscribe


import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

export class MyComponent implements OnInit, OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    this.dataService.getData().pipe(
      takeUntil(this.destroy$)   // Unsubscribe when destroy$ emits
    ).subscribe(data => this.data = data);
  }

  ngOnDestroy() {
    this.destroy$.next();    // Emit to trigger unsubscription
    this.destroy$.complete();
  }
}

combineLatest — Combine Multiple Streams


import { combineLatest } from 'rxjs';

// Combine user and settings data — emits when either updates
combineLatest([this.userService.getUser(), this.settingsService.getSettings()])
  .subscribe(([user, settings]) => {
    this.currentUser = user;
    this.currentSettings = settings;
  });

Subject — Observable and Observer Combined

A Subject is a special type of Observable that can both emit values (like an observer) and be subscribed to (like an Observable). It is used to create a shared data stream that components can push values into and subscribe to.


import { Injectable } from '@angular/core';
import { Subject, BehaviorSubject } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class NotificationService {

  // BehaviorSubject remembers the last emitted value
  // New subscribers immediately receive the most recent value
  private messageSubject = new BehaviorSubject<string>('No notifications');

  message$ = this.messageSubject.asObservable();  // Expose as Observable only

  sendNotification(message: string) {
    this.messageSubject.next(message);   // Push new value to all subscribers
  }
}

// Sending a notification from any component
this.notificationService.sendNotification('Your order has been shipped!');

// Reading the notification in any other component
this.notificationService.message$.subscribe(msg => this.notification = msg);

Practical Example — Typeahead Search

This example combines multiple operators to create a performant search that only sends requests when the user pauses typing:


// search.component.ts
import { Component, OnInit } from '@angular/core';
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap, filter } from 'rxjs/operators';

@Component({ selector: 'app-search', templateUrl: './search.component.html' })
export class SearchComponent implements OnInit {

  searchControl = new FormControl('');
  results: string[] = [];

  constructor(private searchService: SearchService) {}

  ngOnInit() {
    this.searchControl.valueChanges.pipe(
      filter(value => (value?.length ?? 0) > 2),  // Only search if 3+ characters
      debounceTime(400),                            // Wait 400ms after typing stops
      distinctUntilChanged(),                       // Skip if value didn't change
      switchMap(value => this.searchService.search(value || ''))  // Cancel old, start new
    ).subscribe(results => this.results = results);
  }
}

Summary

Observables are lazy streams of values that can emit multiple items over time. RxJS is the library that provides the Observable class and operators for working with them. Observables only execute when subscribed to. The pipe() method chains RxJS operators to transform data before it reaches the subscriber. Common operators include map (transform), filter (select), switchMap (replace inner stream), debounceTime (delay), catchError (handle errors), and takeUntil (auto-unsubscribe on component destroy). Subject and BehaviorSubject act as both emitter and Observable, useful for cross-component communication through services.

Leave a Comment

Your email address will not be published. Required fields are marked *