Angular Observables & RxJS
What are Observables?
An Observable is an object that represents a stream of values over time. Unlike a regular function that returns a single value once, an Observable can deliver zero, one, or many values — either synchronously or asynchronously — and it can signal when the stream is complete or if an error occurs.
A practical analogy: a YouTube channel is like an Observable. The channel (producer) publishes videos over time. Subscribers receive each new video when it is published. Unsubscribing means the subscriber stops receiving notifications. Multiple subscribers can watch the same channel independently.
Angular uses Observables extensively — HTTP requests, form value changes, route parameter changes, and more all return Observables.
What is RxJS?
RxJS (Reactive Extensions for JavaScript) is the library that provides the Observable class along with dozens of operators for transforming, filtering, and combining Observable streams. Angular includes RxJS as a built-in dependency, so no additional installation is needed.
Observable vs Promise
Developers coming from JavaScript backgrounds are familiar with Promises. Observables are more powerful but work differently:
Feature | Promise | Observable
----------------|----------------------------------|----------------------------
Values | Resolves with one value | Emits multiple values over time
Execution | Executes immediately | Lazy — only runs when subscribed
Cancellation | Cannot be cancelled | Can be unsubscribed
Operators | .then(), .catch() | Dozens of RxJS operators
Use in Angular | Rare | Used throughout the framework
Creating and Subscribing to Observables
Creating an Observable
import { Observable } from 'rxjs';
// An Observable that emits three values and completes
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(10); // Emit value 10
subscriber.next(20); // Emit value 20
subscriber.next(30); // Emit value 30
subscriber.complete(); // Signal completion
});
By convention, Observable variable names end with a dollar sign $. This is not a requirement, but a widely adopted Angular convention that makes Observables easy to identify at a glance.
Subscribing to an Observable
An Observable does nothing until something subscribes to it. The subscribe() method starts the execution and provides three optional callback functions:
numbers$.subscribe({
next: (value) => console.log('Received:', value), // Handles each emitted value
error: (err) => console.error('Error:', err), // Handles errors
complete: () => console.log('Stream complete') // Handles completion
});
// Output:
// Received: 10
// Received: 20
// Received: 30
// Stream complete
Common RxJS Creation Functions
RxJS provides helper functions for creating Observables from common sources:
import { of, from, interval, timer, fromEvent } from 'rxjs';
// of — creates an Observable from a list of values
const fruits$ = of('Apple', 'Banana', 'Cherry');
// from — creates an Observable from an array, promise, or iterable
const colors$ = from(['Red', 'Green', 'Blue']);
// interval — emits an incrementing number every X milliseconds
const ticker$ = interval(1000); // Emits 0, 1, 2, 3... every second
// timer — emits once after a delay
const delayed$ = timer(3000); // Emits after 3 seconds
// fromEvent — creates an Observable from DOM events
const clicks$ = fromEvent(document, 'click');
RxJS Operators
Operators are functions that transform the data emitted by an Observable. They are applied using the pipe() method, which chains operators together. The original Observable is not modified — each operator returns a new Observable with the transformation applied.
map — Transform Each Value
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const prices$ = of(100, 200, 300);
prices$.pipe(
map(price => price * 0.9) // Apply 10% discount
).subscribe(discounted => console.log(discounted));
// Output: 90, 180, 270
filter — Keep Only Matching Values
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
const scores$ = of(45, 72, 88, 30, 95, 60);
scores$.pipe(
filter(score => score >= 70) // Keep only passing scores
).subscribe(score => console.log(score));
// Output: 72, 88, 95
tap — Side Effects Without Transforming
import { tap } from 'rxjs/operators';
this.http.get('/api/users').pipe(
tap(data => console.log('Raw data received:', data)), // Log without changing
map(users => users.filter(u => u.active))
).subscribe(activeUsers => this.users = activeUsers);
switchMap — Cancels Previous and Starts New
switchMap is one of the most important operators in Angular. When a new value arrives, it cancels any in-progress inner Observable and starts a new one. This is ideal for search functionality where each keystroke should cancel the previous HTTP request:
import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';
import { fromEvent } from 'rxjs';
// Example: search as user types
searchControl.valueChanges.pipe(
debounceTime(300), // Wait 300ms after the user stops typing
distinctUntilChanged(), // Only proceed if the value actually changed
switchMap(query => this.searchService.search(query)) // Cancel previous, start new
).subscribe(results => this.searchResults = results);
mergeMap — Runs All Concurrently
import { mergeMap } from 'rxjs/operators';
// Download all files simultaneously
fileIds$.pipe(
mergeMap(id => this.fileService.downloadFile(id)) // All requests run at once
).subscribe(file => this.downloadedFiles.push(file));
catchError — Handle Errors
import { catchError, of } from 'rxjs';
this.http.get('/api/data').pipe(
catchError(error => {
console.error('Request failed:', error);
return of([]); // Return empty array as fallback
})
).subscribe(data => this.data = data);
takeUntil — Auto-Unsubscribe
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
export class MyComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.dataService.getData().pipe(
takeUntil(this.destroy$) // Unsubscribe when destroy$ emits
).subscribe(data => this.data = data);
}
ngOnDestroy() {
this.destroy$.next(); // Emit to trigger unsubscription
this.destroy$.complete();
}
}
combineLatest — Combine Multiple Streams
import { combineLatest } from 'rxjs';
// Combine user and settings data — emits when either updates
combineLatest([this.userService.getUser(), this.settingsService.getSettings()])
.subscribe(([user, settings]) => {
this.currentUser = user;
this.currentSettings = settings;
});
Subject — Observable and Observer Combined
A Subject is a special type of Observable that can both emit values (like an observer) and be subscribed to (like an Observable). It is used to create a shared data stream that components can push values into and subscribe to.
import { Injectable } from '@angular/core';
import { Subject, BehaviorSubject } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class NotificationService {
// BehaviorSubject remembers the last emitted value
// New subscribers immediately receive the most recent value
private messageSubject = new BehaviorSubject<string>('No notifications');
message$ = this.messageSubject.asObservable(); // Expose as Observable only
sendNotification(message: string) {
this.messageSubject.next(message); // Push new value to all subscribers
}
}
// Sending a notification from any component
this.notificationService.sendNotification('Your order has been shipped!');
// Reading the notification in any other component
this.notificationService.message$.subscribe(msg => this.notification = msg);
Practical Example — Typeahead Search
This example combines multiple operators to create a performant search that only sends requests when the user pauses typing:
// search.component.ts
import { Component, OnInit } from '@angular/core';
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap, filter } from 'rxjs/operators';
@Component({ selector: 'app-search', templateUrl: './search.component.html' })
export class SearchComponent implements OnInit {
searchControl = new FormControl('');
results: string[] = [];
constructor(private searchService: SearchService) {}
ngOnInit() {
this.searchControl.valueChanges.pipe(
filter(value => (value?.length ?? 0) > 2), // Only search if 3+ characters
debounceTime(400), // Wait 400ms after typing stops
distinctUntilChanged(), // Skip if value didn't change
switchMap(value => this.searchService.search(value || '')) // Cancel old, start new
).subscribe(results => this.results = results);
}
}
Summary
Observables are lazy streams of values that can emit multiple items over time. RxJS is the library that provides the Observable class and operators for working with them. Observables only execute when subscribed to. The pipe() method chains RxJS operators to transform data before it reaches the subscriber. Common operators include map (transform), filter (select), switchMap (replace inner stream), debounceTime (delay), catchError (handle errors), and takeUntil (auto-unsubscribe on component destroy). Subject and BehaviorSubject act as both emitter and Observable, useful for cross-component communication through services.
