Criando canal de mensagens com RabbitMQ e Node.js

Olá pessoal! Hoje irei mostrar como criar um canal de mensagens usando RabbitMQ e Node.js. Primeiramente, você deve tê-los instalado na sua máquina. Você pode realizar o download do RabbitMQ em: https://www.rabbitmq.com/download.html e do NodeJS em: https://nodejs.org/pt-br/download/.

Vamos começar?

Primeiramente, temos que ter em mente o conceito de fila. Caso não conheça a estrutura de dado fila, dê uma olhada neste artigo: https://www.mundojs.com.br/2019/11/01/estrutura-de-dados-com-javascript-fila/ para podermos dar sequência à este artigo.

No RabbitMQ, temos um produtor de mensagens (P) e um consumidor (C). As mensagens ficam armazenadas dentro de uma fila. No diagrama abaixo, a caixa do meio é a fila. O produtor de mensagens envia uma mensagem para a fila, e o consumidor a recebe:

Enviando:

Crie um arquivo send.js, que será o arquivo do produtor de mensagens. No nosso send.js, precisamos realizar a requisição da biblioteca amqplib. Isso pode ser feito desta maneira:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
let amqp = require('amqplib/callback_api');
let amqp = require('amqplib/callback_api');
let amqp = require('amqplib/callback_api');

Em seguida, conectaremos ao servidor RabbitMQ:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0)throw error0;
});
amqp.connect('amqp://localhost', function(error0, connection) { if (error0)throw error0; });
amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0)throw error0;
});

Agora, criaremos um canal onde reside a API para a realização das tarefas, onde devemos declarar uma fila para o envio, e também publicar uma mensagem na fila:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) throw error0;
connection.createChannel(function(error1, channel) {
if (error1) throw error1;
let queue = 'hello';
let msg = 'Olá Mundo!';
channel.assertQueue(queue, {
durable: false
});
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Enviando %s", msg);
});
});
amqp.connect('amqp://localhost', function(error0, connection) { if (error0) throw error0; connection.createChannel(function(error1, channel) { if (error1) throw error1; let queue = 'hello'; let msg = 'Olá Mundo!'; channel.assertQueue(queue, { durable: false }); channel.sendToQueue(queue, Buffer.from(msg)); console.log(" [x] Enviando %s", msg); }); });
amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) throw error0;
    connection.createChannel(function(error1, channel) {
        if (error1) throw error1;
        
        let queue = 'hello';
        let msg = 'Olá Mundo!';

        channel.assertQueue(queue, {
            durable: false
        });
        channel.sendToQueue(queue, Buffer.from(msg));

        console.log(" [x] Enviando %s", msg);
    });
});

Agora, precisamos fechar a conexão e sair:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
setTimeout(function() { connection.close(); process.exit(0); }, 500);
setTimeout(function() {
    connection.close();
    process.exit(0);
}, 500);

E o nosso código do produtor de mensagens seria mais ou menos assim:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) throw error1;
let queue = 'hello';
let msg = 'Olá Mundo!';
channel.assertQueue(queue, {
durable: false
});
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Enviando %s", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
let amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) throw error1; let queue = 'hello'; let msg = 'Olá Mundo!'; channel.assertQueue(queue, { durable: false }); channel.sendToQueue(queue, Buffer.from(msg)); console.log(" [x] Enviando %s", msg); }); setTimeout(function() { connection.close(); process.exit(0); }, 500); });
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function(error1, channel) {
        if (error1) throw error1;
        
        let queue = 'hello';
        let msg = 'Olá Mundo!';

        channel.assertQueue(queue, {
            durable: false
        });
        channel.sendToQueue(queue, Buffer.from(msg));

        console.log(" [x] Enviando %s", msg);
    });
    setTimeout(function() {
        connection.close();
        process.exit(0);
    }, 500);
});

Recebendo:

Agora, nosso consumidor precisa escutar as mensagens enviadas pelo arquivo send.js. O consumidor precisa ficar ativo para escutar as mensagens enviadas. Crie um arquivo chamado receive.js. Este arquivo terá a configuração parecida com o do arquivo send.js. Abrimos uma conexão, um canal e declaramos a fila que iremos consumir:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function (error0, connection) {
if (error0) throw error0;
connection.createChannel(function (error1, channel) {
if (error1) throw error1;
let queue = 'hello';
channel.assertQueue(queue, {
durable: false
});
});
});
let amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function (error0, connection) { if (error0) throw error0; connection.createChannel(function (error1, channel) { if (error1) throw error1; let queue = 'hello'; channel.assertQueue(queue, { durable: false }); }); });
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function (error0, connection) {
    if (error0) throw error0;
    connection.createChannel(function (error1, channel) {
        if (error1) throw error1;
        let queue = 'hello';
        channel.assertQueue(queue, {
            durable: false
        });
     });
});

Queremos garantir a existência de uma fila para que o consumidor não fique iniciando antes do emissor. É exatamente isso que o channel.consume faz:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
console.log(' [*] Esperando por mensagens em %s. Para sair pressione CTRL+C', queue);
channel.consume(queue, function (msg) {
console.log(' [x] Recebida: %s', msg.content.toString());
}, {
noAck: true
});
console.log(' [*] Esperando por mensagens em %s. Para sair pressione CTRL+C', queue); channel.consume(queue, function (msg) { console.log(' [x] Recebida: %s', msg.content.toString()); }, { noAck: true });
console.log(' [*] Esperando por mensagens em %s. Para sair pressione CTRL+C', queue);
channel.consume(queue, function (msg) {
    console.log(' [x] Recebida: %s', msg.content.toString());
    }, {
        noAck: true
    });

E o nosso código do consumidor ficaria assim:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function (error0, connection) {
if (error0) throw error0;
connection.createChannel(function (error1, channel) {
if (error1) throw error1;
let queue = 'hello';
channel.assertQueue(queue, {
durable: false
});
console.log(' [*] Esperando por mensagens em %s. Para sair pressione CTRL+C', queue);
channel.consume(queue, function (msg) {
console.log(' [x] Recebida: %s', msg.content.toString());
}, {
noAck: true
});
});
});
let amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function (error0, connection) { if (error0) throw error0; connection.createChannel(function (error1, channel) { if (error1) throw error1; let queue = 'hello'; channel.assertQueue(queue, { durable: false }); console.log(' [*] Esperando por mensagens em %s. Para sair pressione CTRL+C', queue); channel.consume(queue, function (msg) { console.log(' [x] Recebida: %s', msg.content.toString()); }, { noAck: true }); }); });
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function (error0, connection) {
    if (error0) throw error0;
    connection.createChannel(function (error1, channel) {
        if (error1) throw error1;
        let queue = 'hello';
        channel.assertQueue(queue, {
            durable: false
        });
        console.log(' [*] Esperando por mensagens em %s. Para sair pressione CTRL+C', queue);
        channel.consume(queue, function (msg) {
            console.log(' [x] Recebida: %s', msg.content.toString());
        }, {
            noAck: true
        });
    });
});

Executando:

Agora, executando no terminal cada um dos arquivos, temos as seguintes saídas:

Produtor de mensagens:

Consumidor:

E assim está feito o nosso canal de emissão de mensagens!

Gostou deste artigo? Comente abaixo!

RabbitMQ criando workers com Node.js

Introdução

Dando continuidade a minha serie de artigos sobre RabbitMQ, hoje irei apresentar uma de suas formas de implementação, os workers. Caso tenha interesse em saber um pouco mais sobre os Workers ou ler os primeiros artigos dessa série, eu recomendo a leitura dos artigos abaixo:

Para que você possa ter um melhor entendimento, eu irei criar um exemplo utilizando o Node.js e o RabbitMQ dentro de um container Docker.

O primeiro passo é ter o RabbitMQ instalado, como mencionado acima eu irei utilizar o RabbitMQ dentro de em um container Docker. Caso você ainda não tenha esse ambiente, eu recomendo a leitura do segundo link que eu passei acima, la eu demonstro como criar esse ambiente.

Criação do projeto

Abra um terminal no seu computador, em seguida escolha um local para criação do seu projeto. Navegue até ele via terminal e execute o comando abaixo:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
npm init -y
npm init -y
npm init -y

Esse comando irá inicializar o seu projeto criando um arquivo chamado package.json. Agora vamos baixar o pacote do RabbitMQ. Para isso, execute o comando abaixo no seu terminal:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
npm install amqplib --save
npm install amqplib --save
npm install amqplib --save

Com o projeto criado e a biblioteca do amqplib importada, vamos criar dois novos arquivos no nosso projeto.

O primeiro será o arquivo da nossa aplicação, para esse artigo eu irei chamar ele de app.js. Crie ele na raiz do seu projeto, em seguida atualize ele com o trecho de código abaixo:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost:5672', function (err, conn) {
conn.createChannel(function (err, ch) {
var q = 'hello';
var msg = 'Hello World 123!';
ch.assertQueue(q, { durable: false });
ch.sendToQueue(q, new Buffer(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function () { conn.close(); process.exit(0) }, 500);
});
var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost:5672', function (err, conn) { conn.createChannel(function (err, ch) { var q = 'hello'; var msg = 'Hello World 123!'; ch.assertQueue(q, { durable: false }); ch.sendToQueue(q, new Buffer(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function () { conn.close(); process.exit(0) }, 500); });
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost:5672', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var q = 'hello';
        var msg = 'Hello World 123!';
        ch.assertQueue(q, { durable: false });     
        ch.sendToQueue(q, new Buffer(msg));
        console.log(" [x] Sent %s", msg);
    });
    setTimeout(function () { conn.close(); process.exit(0) }, 500);
});

Analisando o código acima você tem:

Agora vamos criar os nossos Workers. Para isso, crie um novo arquivo chamado worker.js na raiz do seu projeto e atualize ele com o seguinte trecho de código:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost:5672', function (err, conn) {
conn.createChannel(function (err, ch) {
var q = 'hello';
ch.assertQueue(q, { durable: false });
ch.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function (msg) {
console.log(" [x] Received %s", msg.content.toString());
}, { noAck: true });
});
});
var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost:5672', function (err, conn) { conn.createChannel(function (err, ch) { var q = 'hello'; ch.assertQueue(q, { durable: false }); ch.prefetch(1); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); ch.consume(q, function (msg) { console.log(" [x] Received %s", msg.content.toString()); }, { noAck: true }); }); });
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost:5672', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var q = 'hello';

        ch.assertQueue(q, { durable: false });
        ch.prefetch(1);
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        ch.consume(q, function (msg) {
            console.log(" [x] Received %s", msg.content.toString());
        }, { noAck: true });
    });
});

Analisando esse código você tem:

Testando o código

Agora para testar o nosso código, abra 3 terminais no seu computador, navegue até o seu projeto e siga os passos abaixo:

Terminal 01

Execute o comando abaixo para criar o seu primeiro worker:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
node worker.js
node worker.js
node worker.js

Terminal 02

Execute o comando abaixo para criar o seu segundo worker:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
node worker.js
node worker.js
node worker.js

Terminal 03

Execute o comando abaixo para criar a sua aplicação e enviar a primeira mensagem para sua fila.

Obs.: O RabbitMQ trabalha com o conceito de Round Robin, logo o worker 1 pode não ser o primeiro a consumir a sua fila

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
node app.js
node app.js
node app.js

Abaixo você tem um vídeo demonstrando esse passo:

O intuito desse artigo foi demonstrar como trabalhar com os Workers do RabbitMQ utilizando o Node.js. Caso tenha interesse em baixar o código desenvolvido nesse artigo, segue o seu link no meu GitHub: Node-RabbitMQ-Workers.

Espero que tenham gostado e até um próximo artigo pessoal.