RabbitMQ是部署最广泛的开源消息代理。它用于很多场景,比如分布式任务。它非常轻巧,易于在本地和云中部署。它支持多种消息传递协议。 RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。
本文将描述在跨不同语言下使用RabbitMQ的方式,以及注意事项。
在跨不同语言,会使用不同的语言客户端库,这些库中都有可能包含默认设置的参数,因此,在跨语言使用RabbitMQ客户端注意参数设置的问题,下面一个Python与Node.js的示例,两个客户端库使用的默认参数不一致就导致错误
python生产者
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks')
channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
Node.js消费者
const { connect } = require('amqplib')
var q = 'tasks';
(async () => {
try {
let connection = await connect('amqp://localhost')
let channel = await connection.createChannel()
let ok = await channel.assertQueue(q,{durable: false})
if (ok) {
channel.consume(q,(msg) => {
if (msg !== null) {
console.log(msg.content.toString());
channel.ack(msg);
}
})
}
} catch (error) {
console.info(error)
}
})()
Python发送端抛出的错误
Traceback (most recent call last):
File "send.py", line 8, in <module>
channel.queue_declare(queue='tasks')
File "/home/workspace/huangyanxiong/myfreax/admin/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 2507, in queue_declare
self._flush_output(declare_ok_result.is_ready)
File "/home/workspace/huangyanxiong/myfreax/admin/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 1340, in _flush_output
raise self._closing_reason # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'tasks' in vhost '/': received 'false' but current is 'true'")
Node.js接收端抛出的错误
events.js:167
throw er; // Unhandled 'error' event
^
Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'tasks' in vhost '/': received 'false' but current is 'true'"
at Channel.C.accept (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/channel.js:422:17)
at Connection.mainAccept [as accept] (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (events.js:182:13)
at emitReadable_ (_stream_readable.js:534:12)
at process._tickCallback (internal/process/next_tick.js:63:19)
Emitted 'error' event at:
at Connection.emit (events.js:182:13)
at Connection.C.onSocketError (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:353:10)
at Connection.emit (events.js:182:13)
at Socket.go (/home/workspace/huangyanxiong/myfreax/admin/node_modules/amqplib/lib/connection.js:481:12)
at Socket.emit (events.js:182:13)
at emitReadable_ (_stream_readable.js:534:12)
at process._tickCallback (internal/process/next_tick.js:63:19)
什么是RabbitMQ Durable
Durable (the queue will survive a broker restart)
队列具有定义其行为的属性。有一组强制属性和一个可选属性的映射,Durable属性是其中一个。
Durable属性定义了当broker重启时,是否可以重新激活队列,如果Durable为true,队列对应有以下能力
- 队列将持久保存在磁盘上,因此在broker重新启动后仍然存在。不能持久化的队列称为临时队列Temporary Queues。并非所有方案和用例都要求队列是持久的。
- 队列的持久化不会使路由到该队列的消息持久化。如果关闭broker然后将其恢复,则在broker启动期间将重新声明为持久队列,但是,仅持久化消息将被恢复。
最后
注意客户端库默认参数的问题,如果你觉得这个烦恼,另外你可以自己再造一个轮子