How to Implement Delayed Messages with RabbitMQ? Code Examples
16.8.2022
·
Обновлено
8.2.2024
Sometimes you need to implement scheduled or repeating actions into your app. For example, sending a push-notification in 10 minutes or clearing a temporary folder every day.
To do this, you can use cron-tasks, that run scripts on your server automatically, or node-schedule package (a task-planning library for Node.js).
But with both these solutions there’s a scaling problem:
There’re several servers so it might be unclear on which one to run the task
The selected server might crash
The node might get deleted for freed up resources
One of possible solutions here is RabbitMQ, a message broker. Check out the overall delayed messages implementation scheme in this example on GitHub. And here’s what it’s like in detail, step by step:
2. In each of the exchangers create queues with the same binding type but different names.
For HELLO_EXCHANGE:
queues: {
WORLD: {
name: 'hello.world', // subscribe to this queue
binding: 'hello.world',
options: {
durable: true,
},
},
},
For HELLO_DELAYED_EXCHANGE:
queues: {
WORLD: {
name: 'helloDelayed.world',
binding: 'hello.world',
options: {
durable: true,
queueMode: 'lazy', // set the message to remain in the hard memory
},
}
For the delayed-exchanger’s queue, set the x-dead-letter-exchange argument with the regular queue’s name. The argument tells the RabbitMQ broker to transfer the message to this exchanger if it’s not processed.
arguments: {
'x-dead-letter-exchange': HELLO_EXCHANGE.name, // set the queue to transfer the message to once it’s dead
}
3. Publish the message to the delayed-exchanger’s queue with the expiration period
// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
exchangeName: exchangeNameDelayed,
queue: WORLD_DELAYED,
expirationInMs: 30000, //set when the message dies (in 30s)
});
Once the delayed message expires, it will go to the regular exchanger’s queue.
Now you only have to set a consumer for the regular exchanger’s queue:
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_EXCHANGE.queues.WORLD.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
// await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
Profit!
If you need to run the action periodically, publish the message to the delayed exchanger again at the end of the consumer section.
NOTE: RabbitMQ operates on FIFO (first in, first out) – it processes commands in the same order they were set. So if you publish a delayed message with 1 day expiration and a message with 1 minute expiration in the same queue, it will process the second message after the first one, and the target action for the second message will happen a minute after the first.
Eventually, this is what you get:
1. Create the exchangers and queues
// services/base-service/src/broker/const/exchanges.ts
export const HELLO_EXCHANGE = Object.freeze({
name: 'hello',
type: 'direct',
options: {
durable: true,
},
queues: {
WORLD: {
name: 'hello.world', // subscribe to this queue
binding: 'hello.world',
options: {
durable: true,
},
},
},
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
name: 'helloDelayed',
type: 'direct',
options: {
durable: true,
queueMode: 'lazy', // specify that the hard memory must store this message
},
queues: {
WORLD: {
name: 'helloDelayed.world',
binding: 'hello.world',
options: {
durable: true,
queueMode: 'lazy', // specify that the hard memory must store this message arguments: {
'x-dead-letter-exchange': HELLO_EXCHANGE.name, // specify the queue to which the message must relocate after its death
},
},
},
},
});
2. Add the publisher that will send the message to the delayed queue
// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
exchangeName: exchangeNameDelayed,
queue: WORLD_DELAYED,
expirationInMs: 30000, // set when the message dies (in 30s)
});
3. Add the consumer for the regular exchanger’s queue
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_EXCHANGE.queues.WORLD.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
// await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
Profit!
There’s also a plugin that does this work for you and makes the implementation easier. You only create one exchanger, one queue, one publisher, and one consumer.
When publishing, the plugin will process the delayed message and, once it’s expired, will transfer the message to the right queue. All on its own.
With this plugin the scheduled messages are processed in the order of the expiration time. That means, if you publish a message with a 1-day delay and then a message with 1-minute delay, the second one will be processed before the first.
Add publisher that sends the messages to the delayed queue:
export const publishHelloPluginDelayedWorld = createPublisher({
exchangeName: exchangeNamePluginDelayed,
queue: WORLD_PLUGIN_DELAYED,
delayInMs: 60000, // specify when the message should die (60s)
});
We regularly use RabbitMQ in our projects. For instance, check out its use case in Janson Media internet TV portfolio. It’s a movie renting service, but make it digital.
Here we used RabbitMQ delayed messages for the app’s 3 essential features: sending emails and SMS-messages to notify users that, for example, their lease period is almost over; sending messages about completed payments to the socket and sending a notification to the user; sending uploaded videos for further processing.
Hopefully, implementing delayed messages won’t be like falling down the rabbit hole for you anymore (if it ever was) 🙂
Cообщение не отправлено, что-то пошло не так при отправке формы. Попробуйте еще раз.
e-learning-software-development-how-to
Jayempire
9.10.2024
Cool
simulate-slow-network-connection-57
Samrat Rajput
27.7.2024
The Redmi 9 Power boasts a 6000mAh battery, an AI quad-camera setup with a 48MP primary sensor, and a 6.53-inch FHD+ display. It is powered by a Qualcomm Snapdragon 662 processor, offering a balance of performance and efficiency. The phone also features a modern design with a textured back and is available in multiple color options.
this is defenetely what i was looking for. thanks!
how-to-implement-screen-sharing-in-ios-1193
liza
25.1.2024
Can you please provide example for flutter as well . I'm having issue to screen share in IOS flutter.
guide-to-software-estimating-95
Nikolay Sapunov
10.1.2024
Thank you Joy! Glad to be helpful :)
guide-to-software-estimating-95
Joy Gomez
10.1.2024
I stumbled upon this guide from Fora Soft while looking for insights into making estimates for software development projects, and it didn't disappoint. The step-by-step breakdown and the inclusion of best practices make it a valuable resource. I'm already seeing positive changes in our estimation accuracy. Thanks for sharing your expertise!
free-axure-wireframe-kit-1095
Harvey
15.1.2024
Please, could you fix the Kit Download link?. Many Thanks in advance.
Fora Soft Team
15.1.2024
We fixed the link, now the library is available for download! Thanks for your comment
Comments