Observable, observer, and subscription in action
import {Component, OnInit} from '@angular/core';
import {Observable, Subscription} from "rxjs";
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
// `Observable` type which emits `number` data streams
observable: Observable<number>;
ngOnInit(): void {
// initializing an observable
this.observable = new Observable(subscriber => {
// emitting next value
subscriber.next(101);
// emitting value 5 after 5 seconds
setTimeout(() => subscriber.next(5), 5000);
subscriber.next(1);
setTimeout(() => {
// after 7 seconds emitting value 66
subscriber.next(66);
// after 7 seconds completing the observable meaning observable is done emitting the value
subscriber.complete();
}, 7000);
});
}
}
Listening to data streams emitted by observable
Setting up an observer using subscription
export class AppComponent implements OnInit {
...
// subscription variable to hold observable subscription
// this is needed to dispose/destroy subscription whenever we need
subscription: Subscription;
ngOnInit(): void {
...
this.subscription = this.observable.subscribe(value => {
// by default next() is used
console.log(value);
});
}
}
Reacting when observable emits an error or complete notification
export class AppComponent implements OnInit {
title = 'angular-basic';
// `Observable` type which emits `number` data streams
observable: Observable<number>;
// subscription variable to hold observable subscription
// this is needed to dispose/destroy subscription whenever we need
subscription: Subscription;
ngOnInit(): void {
// initializing an observable
this.observable = new Observable(subscriber => {
// emitting next value
subscriber.next(101);
// emitting value 5 after 5 seconds
setTimeout(() => subscriber.next(5), 5000);
subscriber.next(1);
setTimeout(() => {
// after 7 seconds emitting value 66
subscriber.next(66);
// after 7 seconds completing the observable meaning observable is done emitting the value
subscriber.complete();
}, 7000);
setTimeout(() => {
// after 6 seconds sending error
// meaning the observable never gets to complete
subscriber.error('an error occurred!!');
}, 6000);
});
this.subscription = this.observable.subscribe({
next(val) {
console.log(`next val is ${val}`);
},
error(err) {
console.log(`something went wrong: ${err}`);
},
complete() {
console.log('completed');
}
});
}
}
Unsubscribing from observable
export class AppComponent implements OnInit, OnDestroy {
// `Observable` type which emits `number` data streams
observable: Observable<number>;
// subscription variable to hold observable subscription
// this is needed to dispose/destroy subscription whenever we need
subscription: Subscription;
ngOnInit(): void {
}
ngOnDestroy(): void {
this.subscription.unsubscribe();
}
}