To create a high-performance Node.js application that can handle multiple concurrent requests and utilize all the available CPU cores on the server, we can use clustering and load balancing. In this article, we will go through the code example and explain each step in detail.
Importing Necessary Modules
Let’s start with importing the necessary Node.js modules and third-party packages. We use the cluster module to create a cluster of worker processes that will share the incoming requests, ‘os’ to get information about the server’s CPUs, express to create a web server and define API routes, helmet to improve security, compression to compress HTTP responses, morgan to log incoming requests, body-parser to parse JSON requests, and i/o redis to cache responses using Redis. We also use Promisify to convert Redis callbacks to Promises.
const cluster = require('cluster');
const os = require('os');
const express = require('express');
const helmet = require('helmet');
const compression = require('compression');
const morgan = require('morgan');
const bodyParser = require('body-parser');
const Redis = require('ioredis');
const { promisify } = require('util');
Creating Redis Client
Next, we create a Redis client using i/o Redis, and we create promisified versions of the get and set methods using Promisify. These methods will be used to cache responses to incoming requests in Redis.
const redisClient = new Redis(); const getAsync = promisify(redisClient.get).bind(redisClient); const setAsync = promisify(redisClient.set).bind(redisClient);
Defining API Routes
We then define the API routes for our app. When a GET request is made to /api/example, we first check if the response has already been cached in Redis by calling getAsync with the cache key “example”. If a cached response is found, we parse it as JSON and send it as the response to the client. If a cached response is not found, we generate a new response object, cache it in Redis using setAsync, and send it as the response to the client.
When a POST request is made to /api/example, we first parse the request body to get the name parameter. We then generate a new response object with a personalized greeting based on the name parameter, cache it in Redis using setAsync, and send it as a response to the client.
const app = express();
app.use(helmet());
app.use(compression());
app.use(morgan('combined'));
app.use(bodyParser.json());
app.get('/api/example',
async (req, res) => { const cachedResult = await getAsync('example');
if (cachedResult) {
res.json(JSON.parse(cachedResult));
} else {
const result = { message: 'Hello, World!' };
await setAsync('example', JSON.stringify(result), 'EX', 10); res.json(result);
} });
app.post('/api/example', async (req, res) => { const { name } = req.body; const result = { message: `Hello, ${name}!` };
await setAsync('example', JSON.stringify(result), 'EX', 10);
res.json(result); });
Clustering Worker Processes
Finally, we use the cluster module to fork multiple worker processes, each of which will listen for incoming requests on the same port. We use os.cpus().length to get the number of CPUs on the server and spawn a worker process for each CPU.
In the cluster.on(‘exit’) event listener, we log a message indicating which worker process died with its process ID, exit code, and exit signal. We then log a message saying we’re spawning a new worker and then fork a new worker process to replace the one that died.
This ensures that we always have the correct number of worker processes to handle incoming requests, even if a worker process dies unexpectedly.
// Start server
if (cluster.isPrimary) {
// Create a worker for each CPU core
const numWorkers = os.cpus().length;
console.log(`Primary process is running with PID ${process.pid}`); console.log(`Spawning ${numWorkers} workers...`);
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
// Log worker events
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died with code ${code} (${signal})`);
cluster.fork();
});
} else {
// Worker process const server = app.listen(3000, () => { console.log(`Worker ${process.pid} is listening on port 3000`);
});
// Implement graceful shutdown
const shutdown = () => {
console.log(`Worker ${process.pid} is shutting down...`); server.close(() => {
process.exit(0);
});
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
Clustering is a technique that allows you to run multiple instances of your application (called worker processes) in parallel, each of which can handle incoming requests. This can improve the performance and reliability of your application by distributing the workload across multiple cores and allowing for graceful handling of worker process failures.
Here is a step-by-step explanation of what this code does:
- The `cluster.isPrimary` condition checks if the current process is the primary process. The primary process is responsible for creating and managing the worker processes.
- `os.cpus().length` returns the number of CPU cores on the current machine.
- The for loop creates a worker process for each CPU core by calling `cluster.fork()`.
- The cluster.on(‘exit’, …) event listener listens for when a worker process dies (i.e. exits). When this happens, it logs a message indicating which worker process died and with what exit code and signal. It then creates a new worker process to replace the dead one by calling cluster.fork().
- If the current process is not the primary process (i.e. it is a worker process), then it creates a server and listens for incoming requests on port 3000.
- The shutdown() function is defined to gracefully shut down the worker process. When called, it logs a message indicating that the worker process is shutting down, closes the server, and exits the process with a status code of 0.
- The process.on(‘SIGTERM’, …) and process.on(‘SIGINT’, …) event listeners listen for termination signals (SIGTERM and SIGINT, respectively) sent to the process. When a signal is received, the shutdown() function is called to gracefully shut down the worker process.
Here is the complete code
const cluster = require('cluster');
const os = require('os');
const express = require('express');
const helmet = require('helmet');
const compression = require('compression');
const morgan = require('morgan');
const bodyParser = require('body-parser');
const Redis = require('ioredis');
const { promisify } = require('util');
// Define Redis client and promisified Redis methods
const redisClient = new Redis();
const getAsync = promisify(redisClient.get).bind(redisClient);
const setAsync = promisify(redisClient.set).bind(redisClient);
// Initialize app
const app = express();
// Set up middleware
app.use(helmet());
app.use(compression());
app.use(morgan('combined'));
app.use(bodyParser.json());
// Define API routes
app.get(‘/api/example’, async (req, res) => {
const cachedResult = await getAsync(‘example’);
if (cachedResult) {
res.json(JSON.parse(cachedResult));
} else {
const result = { message: ‘Hello, World!’ };
await setAsync(‘example’, JSON.stringify(result), ‘EX’, 10);
// Cache for 10 seconds
res.json(result);
} });
app.post(‘/api/example’, async (req, res) => {
const { name } = req.body;
const result = { message: `Hello, ${name}!` };
await setAsync(‘example’, JSON.stringify(result), ‘EX’, 10);
// Cache for 10 seconds
res.json(result); });
// Start server
if (cluster.isPrimary) {
// Create a worker for each CPU core
const numWorkers = os.cpus().length;
console.log(`Primary process is running with PID ${process.pid}`); console.log(`Spawning ${numWorkers} workers...`);
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
// Log worker events
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died with code ${code} (${signal})`);
cluster.fork();
});
} else {
// Worker process
const server = app.listen(3000, () => {
console.log(`Worker ${process.pid} is listening on port 3000`);
});
// Implement graceful shutdown
const shutdown = () => {
console.log(`Worker ${process.pid} is shutting down...`); server.close(() => {
process.exit(0);
});
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown); }
Hope you find this information helpful. Watch this space for more useful content.
Leave a Reply