haredo

Haredo

Haredo version 2 introduces breaking changes. See 2.0 Changes

npm npm Build Status Coverage Status Libraries.io dependency status for latest release

haredo

Yet another RabbitMQ library

Motivation

xkcd 927: standards

xkcd 927: standards

For a long time I've been using tortoise as my go-to RabbitMQ client. I quite like the chaining API it has but tortoise does have it's downsides (it's not being maintained, accessing message metadata needs you to not use arrow functions, missing typings, etc.)

Features

  • TypeScript
  • Chaining based API
  • Graceful closing
  • RPC

Usage

Working examples are available on github

Initializing

import { haredo } from 'haredo';
const rabbit = haredo({
connection: 'amqp://localhost:5672/'
});

Listening for messages

example on GitHub

rabbit.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false }) // Can be omitted if you don't want to bind the queue to an exchange right now
.subscribe(async (message) => {
console.log(message);
});

Publishing to an exchange

example on GitHub

rabbit.exchange('my-exchange').publish({ id: 5, status: 'active' }, 'item.created');

Publishing to a queue

example on GitHub

rabbit.queue('my-queue').publish({ id: 5, status: 'inactive' });

Limit concurrency

rabbit.queue('my-queue')
.prefetch(5) // same as .concurrency(5)
.subscribe(async (message) => {
console.log(message);
});

RPC

example on GitHub

rabbit.queue('sum')
// With autoReply on, returned value from callback is automatically replied
// Alternative is to use the reply/1 method on the message
.autoReply()
.subscribe(({ data }) => data[0] + data[1]);

const response = await rabbit.queue('sum').rpc([30, 12])

Delayed messages

Note: this requires RabbitMQ Delayed Message Plugin to be installed and enabled on the server.

example on GitHub

interface Message {
id: number;
}
const delayedExchange = e<Message>('my-delayed-exchange', 'x-delayed-message').delayed('topic');
await rabbit.queue('my-queue')
.bindExchange(delayedExchange, '#')
.subscribe(({ data, timestamp }) => {
console.log(`Received message in ${ Date.now() - timestamp }ms id:${ data.id } `);
});
const delayedMessage = preparedMessage().routingKey('item').delay(2000);
let id = 0;
while (true) {
id += 1;
console.log('Publishing message', id);
const msg = delayedMessage.json({ id }).timestamp(Date.now());
await rabbit
.exchange(delayedExchange)
.publish(msg);
await delay(2000);
}

Quorum queues with delivery limits

Node: requires RabbitMQ 3.8.0 or higher, see Quorum Queues Overview for more information.

example on GitHub

Message throttling

example on GitHub

await rabbit.queue('my-queue')
.backoff(standardBackoff({
failThreshold: 3,
failSpan: 5000,
failTimeout: 5000
}))
.subscribe(() => {
throw new Error('Nack this message for me')
});

Dead letter

View on GitHub

Middleware

example on GitHub

import { Middleware } from 'haredo';

const timeMessage: Middleware = ({ queue }, next) => {
const start = Date.now();
await next();
console.log(`Message took ${ Date.now() - start }ms`);
}

await rabbit.queue('my-queue')
.use(timeMessage)
.subscribe(() => {
throw new Error('Nack this message for me')
});

Graceful shutdown

Calling consumer.close() will send cancel to channel and wait for existing messages to be handled before resolving the returned promise.

Calling haredoInstance.close() will gracefully close all of it's consumers

Generated using TypeDoc