Skip to main content

A Small Guide On NestJS Queues

NestJS Application Queues helps to deal with application scaling and performance challenges.

When To Use Queues?:

API request that mostly involves in time taking operations like CPU bound operation, doing them synchronously which will result in thread blocking. So to avoid these issues, it is an appropriate way to make the CPU-bound operation separate background job. 

In nestjs one of the best solutions for these kinds of tasks is to implement the Queues.
For queueing mechanism in the nestjs application most recommended library is '@nestjs/bull'(Bull is nodejs queue library). The 'Bull' depends on Redis cache for data storage like a job. So in this queueing technique, we will create services like 'Producer' and 'Consumer'. The 'Producer' is used to push our jobs into the Redis stores. The consumer will read those jobs(eg: CPU Bound Operations) and process them.

So by using this queues technique user requests processed very fastly because actually logics(for example time taking operations like CPU related) will be carried out by the consumers later as a background job.

Setup Redis Docker Image Container:

For this sample to use Redis instance locally we will use Docker. If you don't have any prior knowledge of docker, not a problem just follow the steps below. Click here for a video session on Redis docker setup
Note:
Skip this section if you already have redis direct instance or azure or any cloud provider that have redis
Step1:
Download docker into our local system "https://docs.docker.com/desktop/". Docker was available for all desktop operating systems.
Step2:
After downloading the docker installer, then install it. Now to run any docker containers(eg: Redis, MongoDB, PostgreSQL, etc) this docker instance we just installed should be active(should be running).
Step3:
Now we need to pull the docker Redis image from the docker hub "https://hub.docker.com/_/redis".
Command To Pull Redis Image:
docker pull redis
Step4:
The final step to run the docker Redis image container by mapping our local system port. By default, the Redis instance runs with the '6379' default port inside of the docker container. So to access the Redis we need to port mapping on starting off the container.
Command To Start Redis Container:
docker run --name your_containerName -p your_PortNumber:6379 -d redis
The '--name your_containerName' flag to specify the Redis container name. The '-p your_PortNumber:6379' mapping the Redis port '6379' to our local machine port all our application will use local machine port to communicate with Redis. The '-d' flag represents to run the container in the detached mode which means run in the background. At the last of the command 'redis' to specify the image to run in our container.
Step5:
After creating a docker container, it will be stored in our local machine so to start again the container any time run the following command
docker start your_container_name

Step6:(Optional Step)
Let test our Redis instance

Command To Use Redis CLI
docker exec -it your_docker_container_name redis-cli

Create A Sample NestJS Application:

Let's begin our journey by creating a sample NetsJS application.
Command To Install CLI:
npm i -g @nestjs/cli
Command To Create NestJS App:
nest new your_project_name

Install Bull Library Package:

npm install --save @nestjs/bull bull

npm install --save-dev @types/bull

Register BullModule And Setup Redis Connection:

Configure the BullModule into the AppModule.ts. The Bull library depends on the 'Redis' store for data communication. So BullModule provides configuration options to register the 'Redis' store.
src/app.module.ts:
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 5003,
      },
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Register A Queue:

Use the 'registerQueue' method from 'BullModule' to configure a queue. So queue a can be identified by its name, we have to name our queue on configuring. Also support multiple queues can be registered as well.
src/app.module.ts:
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 5003,
      },
    }),
    BullModule.registerQueue({
      name:'message-queue'
    })
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
  • Here configure a new queue and its name is 'message-queue'.

Create A Producer To Push Jobs Into Queue:

Job producers add the job to the queues. Producers are typically services. Now we will implement a producer to push the messages into the queue.

Note: Pushing messages into the queues might not be the ideal task. But for beginners to understand the nestjs queues it is the ideal choice.

so let's create a new Producer like 'message.producer.service.ts'.
src/message.producer.service.ts:
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class MessageProducerService {
  constructor(@InjectQueue('message-queue') private queue: Queue) {}

  async sendMessage(message:string){
    await this.queue.add('message-job',{
        text: message
    });
  }
}
  • So to read the queue we have to use '@InjectQueue' decorator along with the name of the queue like 'message-queue'.
  • To push the job into the queue, we have to use the 'add' method of the 'Queue' instance. Here we can specify the job name as well. So this job name will be used by the consumer to read messages as well.
  • So to the 'add' method we can pass the data of our job.
Register the 'MessageProducerService' into the provider array of our AppModule.
src/message.producer.service.ts:
import { MessageProducerService } from './message.producer.service';
@Module({
  providers: [MessageProducerService],
})
export class AppModule {}
Let's create a new endpoint to invoke our newly created producer service.
src/app.controller.ts:
import { Controller, Get, Query } from '@nestjs/common';
import { MessageProducerService } from './message.producer.service';

@Controller()
export class AppController {
  constructor(
    private readonly messageProducerService:MessageProducerService) {}
	
  @Get('invoke-msg')
  getInvokeMsg(@Query('msg') msg:string){
    this.messageProducerService.sendMessage(msg);
    return msg;
  }
}

Create A Consumer To Read Jobs:

A consumer is a class-defining method that processes jobs added into the queue. To make a class consumer it should be decorated with '@Processor()' and with the queue name.  A consumer class must contain a handler method to process the jobs. The handler method should register with '@Process()'.
src/message.consumer.ts:
import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";

@Processor('message-queue')
export class MessageConsumer {

    @Process('message-job')
    readOperationJob(job:Job<unknown>){
        console.log(job.data);
    }
}
Now register our 'MessageConsumer' class in provider array of AppModule. By registering automatically NestJS application activates consumer to read and process jobs.
src/app.module.ts:
import { MessageConsumer } from './message.consumer';

@Module({
  providers: [MessageProducerService],
})
export class AppModule {}
Now to test our queue, run the application then access the endpoint and check the console log.

Create A New Queue For CPU Bound Operation:

Our previous queue is a simple message reading job, which is good for understand about the nestjs queue concept. Now here we will create a new queue that depends on CPU operations.

Case Study:
Let's assume we need to create an endpoint to delete the files from the physical folder path and database as well. Deleting files will be CPU dependent operation, so doing this process synchronously with the user request to the endpoint might lead to an increase in the time taken by the endpoint to serve the users. This may lead to user thread blocking as well.

Queue Solution:
For the above case study best solution is to implement the queues. So if we implement queues then our logic should be like, in the endpoint request, we have to delete the file name from the database, and then we need to save the physical path for the file in the Redis as job data then return the response to the user immediately. So by this time file is not deleted from the physical location. Now server consumer runs regular interval of times to finish the jobs in the Redis store. So a consumer on reading our job related to file deletion and its physical path value then it will execute the job handler method to delete the file as a background job.

Let's register a new queue in AppModule.
src/app.module.ts:
import { BullModule } from '@nestjs/bull';
// code hidden for display purpose

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 5003,
      },
    }),
    BullModule.registerQueue(
      {
        name: 'message-queue',
      },
      {
        name: 'file-operation-queue',
      },
    ),
  ]
})
export class AppModule {}
  • Our new queue name is 'file-operation-queue'
Let's create a new producer like 'file.producer.service.ts'.
src/file.producer.service.ts:
import { InjectQueue } from "@nestjs/bull";
import { Injectable } from "@nestjs/common";
import { Queue } from "bull";

@Injectable()
export class FileProducerService{
    constructor(@InjectQueue('file-operation-queue') private queue:Queue){

    }

    async deleteFile(fileName:string){
        let filePath = `C:/Users/hchowdary/Desktop/testimages/${fileName}`;
        // implementh logic delete the file record from database.
        await this.queue.add('delete-file',{
            filePath: filePath
        });
    }
}
  • Here we pushing the physical location of the file into the Redis store. 
Register our 'FileProducerService' in AppModule.
src/app.module.ts:
import { FileProducerService } from './file.producer.service';
// code hidden for displya purpose
@Module({
  providers: [
    FileProducerService,
  ],
})
export class AppModule {}
Now create a new endpoint as below.
src/app.controller.cs:
import { FileProducerService } from './file.producer.service';
// code hidden for display purpose
@Controller()
export class AppController {
  constructor(
    private readonly fileProducerService:FileProducerService) {}

  @Get('remove-file')
  async deleteFile(@Query('file') file:string){
    await this.fileProducerService.deleteFile(file);
    return 'file deleted';
  }
}
Let's create a Consumer for our new queue.
src/file.consumer.ts:
import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";
import * as fs from 'fs';

@Processor('file-operation-queue')
export class FileConsumer{

    @Process('delete-file')
    async filedeletionJob(job:Job<unknown>){
        let jobData:any = job.data;
        fs.unlinkSync(jobData.filePath);
    }
}
  • Here consumer deleting the file from the specified location as a background job.
Now register our consumer in the provider array of  AppModule.
src/app.module.ts:
import { FileConsumer } from './file.consumer';
// code hidden for display purpose
@Module({
  providers: [
    FileConsumer,
  ],
})
export class AppModule {}
So that all about the usage of queues in the nestjs application.

Video Session:

 

Support Me!
Buy Me A Coffee PayPal Me

Wrapping Up:

Hopefully, I think this article delivered some useful information on queues in NestJS. I love to have your feedback, suggestions, and better techniques in the comment section below.

Follow Me:

Comments

  1. great job on this blog! I'd love to see a NestJS + SQS blog... where you are not using all these things packaged with NestJS. Would be cool to see some custom implementation!

    ReplyDelete
  2. great contribution thx so much, is required the implementation of redis with bull ?

    ReplyDelete
  3. Putting request handling, producer and consumer in same service will not be an ideal solution. Isn't it? even though we have queues we are using resources from same nestjs service to process the background job.

    ReplyDelete
    Replies
    1. Yes I agree to implement in seperate service, i have shown one of the option with a case study to do in the same service. Here i just explained one of the inbuild option like provided by NestJS.

      Delete
    2. If we implement in separate service then how can they interact with each other? anything specific we need to do here?

      Delete
  4. I want to know that the queues are running in a separate process or a thread? Or the queue uses the same main thread to process jobs?

    ReplyDelete
    Replies
    1. Nest JS allows you to run it in a separate thread, there's an example in the docs on how to set it up.

      Delete

Post a Comment

Popular posts from this blog

Angular 14 Reactive Forms Example

In this article, we will explore the Angular(14) reactive forms with an example. Reactive Forms: Angular reactive forms support model-driven techniques to handle the form's input values. The reactive forms state is immutable, any form filed change creates a new state for the form. Reactive forms are built around observable streams, where form inputs and values are provided as streams of input values, which can be accessed synchronously. Some key notations that involve in reactive forms are like: FormControl - each input element in the form is 'FormControl'. The 'FormControl' tracks the value and validation status of form fields. FormGroup - Track the value and validate the state of the group of 'FormControl'. FormBuilder - Angular service which can be used to create the 'FormGroup' or FormControl instance quickly. Form Array - That can hold infinite form control, this helps to create dynamic forms. Create An Angular(14) Application: Let'

.NET 7 Web API CRUD Using Entity Framework Core

In this article, we are going to implement a sample .NET 7 Web API CRUD using the Entity Framework Core. Web API: Web API is a framework for building HTTP services that can be accessed from any client like browser, mobile devices, and desktop apps. In simple terminology API(Application Programming Interface) means an interface module that contains programming functions that can be requested via HTTP calls either to fetch or update data for their respective clients. Some of the Key Characteristics of API: Supports HTTP verbs like 'GET', 'POST', 'PUT', 'DELETE', etc. Supports default responses like 'XML' and 'JSON'. Also can define custom responses. Supports self-hosting or individual hosting, so that all different kinds of apps can consume it. Authentication and Authorization are easy to implement. The ideal platform to build the REST full services. Install The SQL Server And SQL Management Studio: Let's install the SQL server on our l

ReactJS(v18) JWT Authentication Using HTTP Only Cookie

In this article, we will implement the ReactJS application authentication using the HTTP-only cookie. HTTP Only Cookie: In a SPA(Single Page Application) Authentication JWT token either can be stored in browser 'LocalStorage' or in 'Cookie'. Storing the JWT token inside of the cookie then the cookie should be HTTP Only. The HTTP-ONly cookie nature is that it will be only accessible by the server application. Client apps like javascript-based apps can't access the HTTP-Only cookie. So if we use the authentication with HTTP-only JWT cookie then we no need to implement the custom logic like adding authorization header or storing token data, etc at our client application. Because once the user authenticated cookie will be automatically sent to the server by the browser on every API call. Authentication API: To authenticate our client application with JWT HTTP-only cookie, I developed a NetJS(which is a node) Mock API. Check the GitHub link and read the document on G

.NET6 Web API CRUD Operation With Entity Framework Core

In this article, we are going to do a small demo on AspNetCore 6 Web API CRUD operations. What Is Web API: Web API is a framework for building HTTP services that can be accessed from any client like browser, mobile devices, desktop apps. In simple terminology API(Application Programming Interface) means an interface module that contains a programming function that can be requested via HTTP calls to save or fetch the data for their respective clients. Some of the key characteristics of API: Supports HTTP verbs like 'GET', 'POST', 'PUT', 'DELETE', etc. Supports default responses like 'XML' and 'JSON'. Also can define custom responses. Supports self-hosting or individual hosting, so that all different kinds of apps can consume it. Authentication and Authorization are easy to implement. The ideal platform to build REST full services. Create A .NET6 Web API Application: Let's create a .Net6 Web API sample application to accomplish our

Angular 14 State Management CRUD Example With NgRx(14)

In this article, we are going to implement the Angular(14) state management CRUD example with NgRx(14) NgRx Store For State Management: In an angular application to share consistent data between multiple components, we use NgRx state management. Using NgRx state helps to avoid unwanted API calls, easy to maintain consistent data, etc. The main building blocks for the NgRx store are: Actions - NgRx actions represents event to trigger the reducers to save the data into the stores. Reducer - Reducer's pure function, which is used to create a new state on data change. Store - The store is the model or entity that holds the data. Selector - Selector to fetch the slices of data from the store to angular components. Effects - Effects deals with external network calls like API. The effect gets executed based the action performed Ngrx State Management flow: The angular component needs data for binding.  So angular component calls an action that is responsible for invoking the API call.  Aft

Unit Testing Asp.NetCore Web API Using xUnit[.NET6]

In this article, we are going to write test cases to an Asp.NetCore Web API(.NET6) application using the xUnit. xUnit For .NET: The xUnit for .Net is a free, open-source, community-focused unit testing tool for .NET applications. By default .Net also provides a xUnit project template to implement test cases. Unit test cases build upon the 'AAA' formula that means 'Arrange', 'Act' and 'Assert' Arrange - Declaring variables, objects, instantiating mocks, etc. Act - Calling or invoking the method that needs to be tested. Assert - The assert ensures that code behaves as expected means yielding expected output. Create An API And Unit Test Projects: Let's create a .Net6 Web API and xUnit sample applications to accomplish our demo. We can use either Visual Studio 2022 or Visual Studio Code(using .NET CLI commands) to create any.Net6 application. For this demo, I'm using the 'Visual Studio Code'(using the .NET CLI command) editor. Create a fo

Angular 14 Crud Example

In this article, we will implement CRUD operation in the Angular 14 application. Angular: Angular is a framework that can be used to build a single-page application. Angular applications are built with components that make our code simple and clean. Angular components compose of 3 files like TypeScript File(*.ts), Html File(*.html), CSS File(*.cs) Components typescript file and HTML file support 2-way binding which means data flow is bi-directional Component typescript file listens for all HTML events from the HTML file. Create Angular(14) Application: Let's create an Angular(14) application to begin our sample. Make sure to install the Angular CLI tool into our local machine because it provides easy CLI commands to play with the angular application. Command To Install Angular CLI npm install -g @angular/cli Run the below command to create the angular application. Command To Create Angular Application ng new name_of_your_app Note: While creating the app, you will see a noti

Part-1 Angular JWT Authentication Using HTTP Only Cookie[Angular V13]

In this article, we are going to implement a sample angular application authentication using HTTP only cookie that contains a JWT token. HTTP Only JWT Cookie: In a SPA(Single Page Application) Authentication JWT token either can be stored in browser 'LocalStorage' or in 'Cookie'. Storing JWT token inside of the cookie then the cookie should be HTTP Only. The HTTP-Only cookie nature is that it will be only accessible by the server application. Client apps like javascript-based apps can't access the HTTP-Only cookie. So if we use authentication with HTTP only JWT cookie then we no need to implement custom logic like adding authorization header or storing token data, etc at our client application. Because once the user authenticated cookie will be automatically sent to the server by the browser on every API call. Authentication API: To implement JWT cookie authentication we need to set up an API. For that, I had created a mock authentication API(Using the NestJS Se

ReactJS(v18) Authentication With JWT AccessToken And Refresh Token

In this article, we are going to do ReactJS(v18) application authentication using the JWT Access Token and Refresh Token. JSON Web Token(JWT): JSON Web Token is a digitally signed and secured token for user validation. The JWT is constructed with 3 important parts: Header Payload Signature Create ReactJS Application: Let's create a ReactJS application to accomplish our demo. npx create-react-app name-of-your-app Configure React Bootstrap Library: Let's install the React Bootstrap library npm install react-bootstrap bootstrap Now add the bootstrap CSS reference in 'index.js'. src/index.js: import 'bootstrap/dist/css/bootstrap.min.css' Create A React Component 'Layout': Let's add a React component like 'Layout' in 'components/shared' folders(new folders). src/components/shared/Layout.js: import Navbar from "react-bootstrap/Navbar"; import { Container } from "react-bootstrap"; import Nav from "react-boot