NestJS, is a progressive Node.js framework based on Angular core for backend applications. It is built with Typescript. In this article, we'll see how Observables can be a useful alternative to Promises in specific use cases.
In Javascript/Typescript, Promises are a standard for executing asynchronous tasks such as database queries, file operations, HTTP requests...
// Example of Promise in NestJS
@Injectable()
export class UserService {
async findById(id: string): Promise<User> {
return this.userModel.findById(id).exec();
}
}
The main features of Promises are :
Observables comes in RxJS library and represent a stream of values over time. They offer powerful capabilities for handling asynchronous operations, such as transformation, combination, and cancellation.
// Example of Observable in NestJS
@Injectable()
export class UserService {
findAll(): Observable<User[]> {
return from(this.userModel.find().exec());
}
}
With Observables, you can write code that reacts to changes in data and events, allowing you to build more reactive applications.
The main features of Observables are :
One of the key benefits of using Observables is that they support composition. You can combine multiple observables, apply operators to them, and create new observables as a result. This makes it easy to build complex asynchronous workflows and handle data dependencies.
Also, Observables support handling errors and completion. Observables can emit error notifications when something goes wrong during data processing, allowing you to handle and recover from errors with compensation process. They also emit a notification when the stream of data has ended, indicating the end of the streaming.
So, Observables and RxJS helps you to write code that is more reactive, declarative, and efficient when dealing with complex asynchronous scenarios.
Using RxJS with a NestJS application can enhance its capabilities for handling asynchronous operations and creating reactive pipelines. Here are some common RxJS use cases in NestJS.
As any application, you may want to perform asynchronous time-consuming operations. Observables can be used to represent asynchronous data streams and apply operators like from
to convert Promise or callback functions into Observables and handle data processing.
import { Injectable } from '@nestjs/common';
import { Observable, from } from 'rxjs';
import { AxiosResponse } from 'axios';
@Injectable()
export class DataService {
fetchData(): Observable<any> {
return from(getExternalDataFromAPI());
}
}
async function getExternalDataFromAPI(): Promise<AxiosResponse> {
// Simulate fetching data from an external API
return axios.get('https://myapi.com/data');
}
You can create reactive endpoints that respond to changes in data or events thanks to Observables representing data streams and then returning the result as a response.
import { Controller, Get } from '@nestjs/common';
import { Observable, interval } from 'rxjs';
import { map } from 'rxjs/operators';
@Controller('data')
export class DataReactiveController {
@Get('stream')
streamData(): Observable<number> {
// Simulate streaming data with an interval
return interval(1000).pipe(
map(() => Math.random()) // Transform the interval data
);
}
}
In a microservices architecture, NestJS applications may need to communicate with other services. RxJS can facilitate this communication by using Observables as a means of streaming data between services. You can use operators like switchMap
or mergeMap
to handle data dependencies and make multiple service calls in a reactive manner.
import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { switchMap } from 'rxjs/operators';
@Controller('data')
export class DataController {
private readonly client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8888,
},
});
}
@Get()
fetchData(): Observable<YourType> {
return this.client.send<YourType>({ cmd: 'fetchData' }).pipe(
switchMap(response => {
// Perform additional operations with the received data
// For example, make another service call based on the initial response
return this.client.send<YourType>({ cmd: 'processData', data: response });
})
);
}
}
Sometimes, in complex communication flows, having a graphical representation of them can be helpful. This is why you may want to use Marbles.
In RxJS, marbles are a visual representation used to illustrate the behavior of observable sequences, operators, and time-based events. These diagrams consist of characters, such as -, |, ^, and #, representing different aspects of observable streams, including values emitted over time, completion, errors, and subscription points. You can use tools like ThinkRx to visualize your flows.
NestJS provides middleware and pipes for intercepting and modifying incoming requests and outgoing responses. You can use RxJS operators to handle asynchronous operations within middleware or pipes. For example, you can use the map
operator to transform data or catchError
operator to handle errors.
import { Injectable, NestMiddleware } from '@nestjs/common';
import { Request, Response } from 'express';
import { Observable, of } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
@Injectable()
export class LoggingMiddleware implements NestMiddleware {
use(req: Request, res: Response, next: () => void) {
console.log('Logging middleware executing...');
// Simulate an asynchronous operation
this.asyncOperation().pipe(
map(data => {
// Transform data if needed
return data.toUpperCase();
}),
catchError(error => {
// Handle errors if any
console.error('Error occurred in logging middleware:', error);
return of('Error occurred in logging middleware');
})
).subscribe(
transformedData => {
console.log('Transformed data:', transformedData);
next();
}
);
}
asyncOperation(): Observable<string> {
return new Observable<string>(observer => {
setTimeout(() => {
observer.next('Data from async operation');
observer.complete();
}, 1000);
});
}
}
NestJS applications can benefit from event-driven programming, where components react to events and trigger actions accordingly. RxJS provides a rich set of operators to handle event streams. You can use subjects or event emitters as Observables to represent events and use operators like filter
or debounceTime
to handle event stream transformations. Let's illustrate it with a real-time notification system :
import { Injectable } from '@nestjs/common';
import { Subject, Observable } from 'rxjs';
import { filter, debounceTime } from 'rxjs/operators';
@Injectable()
export class EventService {
private eventSubject = new Subject<string>();
emitEvent(event: string): void {
this.eventSubject.next(event);
}
getFilteredEvents(keyword: string): Observable<string> {
return this.eventSubject.pipe(
filter(event => event.includes(keyword))
);
}
getDebouncedEvents(time: number): Observable<string> {
return this.eventSubject.pipe(
debounceTime(time)
);
}
}
You also have testing utilities that can be used to write tests for NestJS applications in RxJS. You can use operators like toArray
or toPromise
to convert Observables into arrays or promises to assert the emitted values during testing.
Let's imagine a data service like :
import { Injectable } from '@nestjs/common';
import { HttpClient } from '@nestjs/common';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@Injectable()
export class DataService {
constructor(private readonly http: HttpClient) {}
fetchData(): Observable<YourType[]> {
return this.http.get<YourType[]>('https://myapi.com/data').pipe(
map(response => response.map(item => ({ id: item.id, name: item.name })))
);
}
}
Here is the NestJS test we could build thanks to RxJS:
import { Test, TestingModule } from '@nestjs/testing';
import { DataService } from './data.service';
import { HttpClientTestingModule, HttpTestingController } from '@nestjs/common/testing';
import { of } from 'rxjs';
describe('DataService', () => {
let service: DataService;
let httpTestingController: HttpTestingController;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [HttpClientTestingModule],
providers: [DataService],
}).compile();
service = module.get<DataService>(DataService);
httpTestingController = module.get<HttpTestingController>(HttpTestingController);
});
afterEach(() => {
httpTestingController.verify();
});
it('should be defined', () => {
expect(service).toBeDefined();
});
it('should fetch data from the API and transform it', (done) => {
const testData = [{ id: 1, name: 'Item 1' }, { id: 2, name: 'Item 2' }];
const transformedData = [{ id: 1, name: 'Item 1' }, { id: 2, name: 'Item 2' }];
service.fetchData().subscribe((data) => {
expect(data).toEqual(transformedData);
done();
});
const req = httpTestingController.expectOne('https://myapi.com/data');
expect(req.request.method).toEqual('GET');
req.flush(testData);
});
it('should handle errors', (done) => {
const errorResponse = { status: 404, message: 'Not Found' };
service.fetchData().subscribe(
() => {},
(error) => {
expect(error).toEqual(errorResponse);
done();
}
);
const req = httpTestingController.expectOne('https://myapi.com/data');
req.error(new ErrorEvent('Error'));
});
});
We've created a DataService that fetches data from an external API using the HttpClient from @nestjs/common. The fetchData method transforms the data using the map operator before returning it as an Observable.
In the tests, we use Test.createTestingModule from @nestjs/testing to set up a testing module. We import HttpClientTestingModule from @nestjs/common/testing to mock the HttpClient. We then test the behavior of the fetchData method by subscribing to the Observable and asserting the emitted values. We also test error handling by simulating an error response from the API.
By using RxJS testing utilities like of, toPromise, and HttpTestingController from @nestjs/common/testing, we can easily write tests for NestJS applications that use Observables, ensuring our services behave as expected and handle errors gracefully.
When needed, transitioning from Promises to Observables & RxJS operators in NestJS opens up new possibilities for handling complex asynchronous workflows. Whether you're fetching data from external APIs in a pipe
, handling real-time updates, or managing streams of events, it offers a great tools for sustainable asynchronous programming in NestJS.