Skip to content
Angular Essentials
GitHub

Observable, observer, and subscription in action

import {Component, OnInit} from '@angular/core';
import {Observable, Subscription} from "rxjs";

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
  // `Observable` type which emits `number` data streams
  observable: Observable<number>;

  ngOnInit(): void {
    // initializing an observable
    this.observable = new Observable(subscriber => {
      // emitting next value
      subscriber.next(101);
      // emitting value 5 after 5 seconds
      setTimeout(() => subscriber.next(5), 5000);
      subscriber.next(1);
      setTimeout(() => {
        // after 7 seconds emitting value 66
        subscriber.next(66);
        // after 7 seconds completing the observable meaning observable is done emitting the value
        subscriber.complete();
      }, 7000);
    });
  }
}

Listening to data streams emitted by observable

Setting up an observer using subscription

export class AppComponent implements OnInit {

  ...
  
  // subscription variable to hold observable subscription
  // this is needed to dispose/destroy subscription whenever we need
  subscription: Subscription;

  ngOnInit(): void {

    ...
      
    this.subscription = this.observable.subscribe(value => {
      // by default next() is used
      console.log(value);
    });
  }
}

Reacting when observable emits an error or complete notification

export class AppComponent implements OnInit {
  title = 'angular-basic';

  // `Observable` type which emits `number` data streams
  observable: Observable<number>;

  // subscription variable to hold observable subscription
  // this is needed to dispose/destroy subscription whenever we need
  subscription: Subscription;

  ngOnInit(): void {
    // initializing an observable
    this.observable = new Observable(subscriber => {
      // emitting next value
      subscriber.next(101);
      // emitting value 5 after 5 seconds
      setTimeout(() => subscriber.next(5), 5000);
      subscriber.next(1);
      
      setTimeout(() => {
        // after 7 seconds emitting value 66
        subscriber.next(66);
        // after 7 seconds completing the observable meaning observable is done emitting the value
        subscriber.complete();
      }, 7000);
      
      setTimeout(() => {
        // after 6 seconds sending error
        // meaning the observable never gets to complete
        subscriber.error('an error occurred!!');
      }, 6000);
    });

    this.subscription = this.observable.subscribe({
      next(val) {
        console.log(`next val is ${val}`);
      },
      error(err) {
        console.log(`something went wrong: ${err}`);
      },
      complete() {
        console.log('completed');
      }
    });
  }
}

Unsubscribing from observable

export class AppComponent implements OnInit, OnDestroy {

  // `Observable` type which emits `number` data streams
  observable: Observable<number>;

  // subscription variable to hold observable subscription
  // this is needed to dispose/destroy subscription whenever we need
  subscription: Subscription;
    
  ngOnInit(): void {
  }
  
  ngOnDestroy(): void {
    this.subscription.unsubscribe();
  }
}