haredo

    haredo

    Haredo

    Haredo version 3 introduces breaking changes. See 3.0 Changes

    npm npm Build Status Libraries.io dependency status for latest release

    RabbitMQ client for Node.js with a focus on simplicity and type safety.

    Working examples are available on github

    import { Haredo } from 'haredo';
    const haredo = Haredo({
    url: 'amqp://localhost:5672/'
    });

    example on GitHub

    haredo.queue('my-queue')
    .bindExchange('testExchange', '#', 'topic', { durable: false }) // Can be omitted if you don't want to bind the queue to an exchange right now
    .subscribe(async (message) => {
    console.log(message.data);
    });

    example on GitHub

    haredo.exchange('my-exchange').publish({ id: 5, status: 'active' }, 'item.created');
    

    example on GitHub

    haredo.queue('my-queue').publish({ id: 5, status: 'inactive' });
    
    haredo.queue('my-queue')
    .prefetch(5) // same as .concurrency(5)
    .subscribe(async (message) => {
    console.log(message);
    });

    Note: this requires RabbitMQ Delayed Message Plugin to be installed and enabled on the server.

    example on GitHub

    interface Message {.exchange
    id: number;
    }
    const delayedExchange = Exchange<Message>('my-delayed-exchange', 'x-delayed-message').delayed('topic');
    await haredo.queue('my-queue')
    .bindExchange(delayedExchange, '#')
    .subscribe((data, { timestamp }) => {
    console.log(`Received message in ${ Date.now() - timestamp }ms id:${ data.id } `);
    });
    let id = 0;
    while (true) {
    id += 1;
    console.log('Publishing message', id);
    const msg = delayedMessage.json({ id }).timestamp(Date.now());
    await haredo
    .exchange(delayedExchange)
    .delay(1000)
    .publish(msg);
    await delay(2000);
    }

    Node: requires RabbitMQ 3.8.0 or higher, see Quorum Queues Overview for more information.

    example on GitHub

    example on GitHub

    await haredo.queue('my-queue')
    .backoff(standardBackoff({
    failThreshold: 3,
    failSpan: 5000,
    failTimeout: 5000
    }))
    .subscribe(() => {
    throw new Error('Nack this message for me')
    });

    View on GitHub

    example on GitHub

    import { Middleware } from 'haredo';

    const timeMessage: Middleware = ({ queue }, next) => {
    const start = Date.now();
    await next();
    console.log(`Message took ${ Date.now() - start }ms`);
    }

    await haredo.queue('my-queue')
    .use(timeMessage)
    .subscribe(() => {
    throw new Error('Nack this message for me')
    });

    Add a middleware that will be called for every message in every subscriber

    example on GitHub

    declare module 'haredo/types' {
    interface HaredoMessage<T> {
    cid?: string;
    }
    }
    const haredo = Haredo({
    url: 'amqp://localhost:5672/'
    globalMiddleware: [
    (message) => {
    message.cid = message.headers?.['x-cid'] as string;
    }
    ]
    });

    Calling consumer.close() will send cancel to channel and wait for existing messages to be handled before resolving the returned promise.

    Calling haredoInstance.close() will gracefully close all of it's consumers

    By default Haredo will automatically assert the queues and exchanges and bind them to each other each time publish/subscribe is called. This can be disabled by calling .skipSetup()

    await haredo.queue('my-queue')
    .skipSetup()
    .subscribe(() => {
    throw new Error('Nack this message for me');
    });

    // Only create the queue, don't bind it to any exchanges and don't create any exchanges
    await haredo.queue('my-queue')
    .bindExchange('testExchange', '#', 'topic', { durable: false })
    .skipSetup({ skipBoundExchanges: true, skipBindings: true, skipCreate: false });

    Add new methods to the Haredo instance. Only available for publish chains. Allows you to modify the state, requires returning the modified state.

    example on GitHub


    interface Extension {
    queue: {
    /** Add a cid header to publishing */
    cid<T>(cid: string): QueueChain<T>;
    };
    }

    const haredo = Haredo<Extension>({
    url: 'amqp://localhost:5672/'
    extensions: [
    {
    name: 'cid',
    queue: (state) => {
    return (cid: string) => ({
    ...state,
    headers: {
    ...state.headers,
    'x-cid': cid
    }
    });
    }
    }
    ]
    });

    await haredo.queue('my-queue')
    .cid('123')
    .publish({ id: 5, status: 'inactive' });
    MMNEPVFCICPMFPCPTTAAATR