RabbitMQ是部署最广泛的开源消息代理。它用于很多场景,比如分布式任务。它非常轻巧,易于在本地和云中部署。它支持多种消息传递协议。 RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。

本文将描述在跨不同语言下使用RabbitMQ的方式,以及注意事项。

在跨不同语言,会使用不同的语言客户端库,这些库中都有可能包含默认设置的参数,因此,在跨语言使用RabbitMQ客户端注意参数设置的问题,下面一个Python与Node.js的示例,两个客户端库使用的默认参数不一致就导致错误

python生产者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks')

channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Node.js消费者

const { connect } = require('amqplib')

var q = 'tasks';

(async () => {
    
    try {
        let connection = await connect('amqp://localhost')
        let channel = await connection.createChannel()
        let ok = await channel.assertQueue(q,{durable: false})
        if (ok) {
            channel.consume(q,(msg) => {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    channel.ack(msg);
                }
            })
        }

    } catch (error) {
        console.info(error)
    }

})()

Python发送端抛出的错误

Traceback (most recent call last):
  File "send.py", line 8, in <module>
    channel.queue_declare(queue='tasks')
  File "/home/workspace/huangyanxiong/myfreax/admin/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 2507, in queue_declare
    self._flush_output(declare_ok_result.is_ready)
  File "/home/workspace/huangyanxiong/myfreax/admin/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 1340, in _flush_output
    raise self._closing_reason  # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'tasks' in vhost '/': received 'false' but current is 'true'")

Node.js接收端抛出的错误

events.js:167
      throw er; // Unhandled 'error' event
      ^

Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'tasks' in vhost '/': received 'false' but current is 'true'"
    at Channel.C.accept (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/channel.js:422:17)
    at Connection.mainAccept [as accept] (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:64:33)
    at Socket.go (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:478:48)
    at Socket.emit (events.js:182:13)
    at emitReadable_ (_stream_readable.js:534:12)
    at process._tickCallback (internal/process/next_tick.js:63:19)
Emitted 'error' event at:
    at Connection.emit (events.js:182:13)
    at Connection.C.onSocketError (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:353:10)
    at Connection.emit (events.js:182:13)
    at Socket.go (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:481:12)
    at Socket.emit (events.js:182:13)
    at emitReadable_ (_stream_readable.js:534:12)
    at process._tickCallback (internal/process/next_tick.js:63:19)

什么是RabbitMQ Durable

Durable (the queue will survive a broker restart)

队列具有定义其行为的属性。有一组强制属性和一个可选属性的映射,Durable属性是其中一个。

Durable属性定义了当broker重启时,是否可以重新激活队列,如果Durable为true,队列对应有以下能力

  • 队列将持久保存在磁盘上,因此在broker重新启动后仍然存在。不能持久化的队列称为临时队列Temporary Queues。并非所有方案和用例都要求队列是持久的。
  • 队列的持久化不会使路由到该队列的消息持久化。如果关闭broker然后将其恢复,则在broker启动期间将重新声明为持久队列,但是,仅持久化消息将被恢复。

最后

注意客户端库默认参数的问题,如果你觉得这个烦恼,另外你可以自己再造一个轮子


如果你喜欢我们的内容可以选择在下方二维码中捐赠我们,或者点击广告予以支持,感谢你的支持