Yet another RabbitMQ library
For a long time I've been using tortoise as my go-to RabbitMQ client. I quite like the chaining API it has but tortoise does have it's downsides (it's not being maintained, accessing message metadata needs you to not use arrow functions, missing typings, etc.)
Working examples are available on github
import { haredo } from 'haredo';
const rabbit = haredo({
connection: 'amqp://localhost:5672/'
});
rabbit.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false }) // Can be omitted if you don't want to bind the queue to an exchange right now
.subscribe(async (message) => {
console.log(message);
});
rabbit.exchange('my-exchange').publish({ id: 5, status: 'active' }, 'item.created');
rabbit.queue('my-queue').publish({ id: 5, status: 'inactive' });
rabbit.queue('my-queue')
.prefetch(5) // same as .concurrency(5)
.subscribe(async (message) => {
console.log(message);
});
rabbit.queue('sum')
// With autoReply on, returned value from callback is automatically replied
// Alternative is to use the reply/1 method on the message
.autoReply()
.subscribe(({ data }) => data[0] + data[1]);
const response = await rabbit.queue('sum').rpc([30, 12])
Note: this requires RabbitMQ Delayed Message Plugin to be installed and enabled on the server.
interface Message {
id: number;
}
const delayedExchange = e<Message>('my-delayed-exchange', 'x-delayed-message').delayed('topic');
await rabbit.queue('my-queue')
.bindExchange(delayedExchange, '#')
.subscribe(({ data, timestamp }) => {
console.log(`Received message in ${ Date.now() - timestamp }ms id:${ data.id } `);
});
const delayedMessage = preparedMessage().routingKey('item').delay(2000);
let id = 0;
while (true) {
id += 1;
console.log('Publishing message', id);
const msg = delayedMessage.json({ id }).timestamp(Date.now());
await rabbit
.exchange(delayedExchange)
.publish(msg);
await delay(2000);
}
Node: requires RabbitMQ 3.8.0 or higher, see Quorum Queues Overview for more information.
await rabbit.queue('my-queue')
.backoff(standardBackoff({
failThreshold: 3,
failSpan: 5000,
failTimeout: 5000
}))
.subscribe(() => {
throw new Error('Nack this message for me')
});
import { Middleware } from 'haredo';
const timeMessage: Middleware = ({ queue }, next) => {
const start = Date.now();
await next();
console.log(`Message took ${ Date.now() - start }ms`);
}
await rabbit.queue('my-queue')
.use(timeMessage)
.subscribe(() => {
throw new Error('Nack this message for me')
});
Calling consumer.close() will send cancel to channel and wait for existing messages to be handled before resolving the returned promise.
Calling haredoInstance.close() will gracefully close all of it's consumers
Generated using TypeDoc