RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件 ,RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。本文主要介绍Python中使用pika模块direct路由模式来实现与RabbitMQ通讯,以及相关的示例代码。

1、安装RabbitMQ

参考文档:

Windows 上下载安装 RabbitMQ 的方法步骤

Linux 上下载安装 RabbitMQ 的方法步骤

2、安装pika

pika 是 AMQP 0-9-1 协议的纯 Python 实现,它基本保持完全独立于底层网络支持库。Pika可以通过PyPI下载,并可以使用easy_install或pip安装:

pip install pika

easy_install pika

相关文档:https://pika.readthedocs.io/en/stable/

3、使用说明

1)关键词说明

关键词

说明

Broker

消息队列服务器实体。

Exchange

消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue

消息队列载体,每个消息都会被投入到一个或多个队列。

Binding

绑定,它的作用就是把exchange

和queue按照路由规则绑定起来。

Routing Key

路由关键字,exchange根据这个关键字进行消息投递。

vhost

虚拟主机,一个broker里可以开设多个vhost,

用作不同用户的权限分离。

producer

消息生产者,就是投递消息的程序。

consumer

消息消费者,就是接受消息的程序。

channel

消息通道,在客户端的每个连接里,

可建立多个channel,每个channel代表一个会话任务。 

2)消息队列运行机制

客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。

Exchange接收到消息后,就根据消息的key和已经设置的binding,将消息投递到一个或多个队列里。

注意:在声明一个队列后,如果将其持久化,则下次不需要进行声明。

3)Exchange说明

Exchange类型

说明

Direct交换机

依据key进行投递,

如绑定时设置了routing key为”cjavapy”,

客户端提交的消息,

则只有设置了key为”cjavapy”的才会投递到队列。

Topic交换机

对key模式匹配后进行投递,

符号”#”匹配一个或多个词,符号”*”匹配一个词。

Fanout交换机

不需要key,采取广播模式,

一个消息进来时,投递到与该交换机绑定的所有队列

4、direct路由模式的使用

⽣产者发送消息时需要指定RoutingKey,即路由Key,Exchange接收到消息时转发到与RoutingKey相匹配的队列中。

1)生产者

import pika
import sys

credentials = pika.PlainCredentials('admin', 'admin') # mq⽤户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()
# 声明⼀个名为direct_logs的direct类型的exchange
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = ['info', 'warning', 'error']
for i in range(20):
    message = '{} Hello World! {}'.format(i, severity[i % 3])
    # 向名为direct_logs的exchage按照设置的routing_key发送messag
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity[i % 3],
                          body=message)
    print(" [x] Sent: {}".format(message))
connection.close()

2)消费者

import pika
import sys


credentials = pika.PlainCredentials('admin', 'admin') # mq⽤户名和密码
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()
# 在producer和consumer中分别声明⼀次以保证所要使⽤的exchange存在
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(queue='cjavapy_d',durable=True)# ,arguments={"x-queue-type": "stream"})
# ⽤于获取临时queue的name
queue_name = result.method.queue
# Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,
channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key='info')#,arguments={"x-queue-type": "stream"})

print(' [*] Waiting for logs. To exit press CTRL+C')
# 定义⼀个回调函数来处理消息队列中的消息,
def callback(ch, method, properties, body):
# ⼿动发送确认消息
    print(" [x] %r:%r" % (method.routing_key, body))
    ch.basic_ack(delivery_tag=method.delivery_tag)
print(queue_name)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name,on_message_callback=callback)
# 开始接收信息,并进⼊阻塞状态,队列⾥有信息才会调⽤callback进⾏处理
channel.start_consuming()

相关文档:

Python RabbitMQ pika的安装及work消息模型的使用

Python RabbitMQ pika的安装及单生产单消费模型的使用

Python RabbitMQ pika的安装及fanout消息订阅模式的使用

推荐文档