Python RabbitMQ pika的安装及topic匹配模式的使用

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件 ,RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。本文主要介绍Python中使用pika模块topic匹配模式来实现与RabbitMQ通讯,以及相关的示例代码。

1、安装RabbitMQ

参考文档:

Windows 上下载安装 RabbitMQ 的方法步骤

Linux 上下载安装 RabbitMQ 的方法步骤

2、安装pika

pika 是 AMQP 0-9-1 协议的纯 Python 实现,它基本保持完全独立于底层网络支持库。Pika可以通过PyPI下载,并可以使用easy_install或pip安装:

pip install pika

easy_install pika

相关文档:https://pika.readthedocs.io/en/stable/

3、使用说明

1)关键词说明

关键词

说明

Broker

消息队列服务器实体。

Exchange

消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue

消息队列载体,每个消息都会被投入到一个或多个队列。

Binding

绑定,它的作用就是把exchange

和queue按照路由规则绑定起来。

Routing Key

路由关键字,exchange根据这个关键字进行消息投递。

vhost

虚拟主机,一个broker里可以开设多个vhost,

用作不同用户的权限分离。

producer

消息生产者,就是投递消息的程序。

consumer

消息消费者,就是接受消息的程序。

channel

消息通道,在客户端的每个连接里,

可建立多个channel,每个channel代表一个会话任务。 

2)消息队列运行机制

客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。

Exchange接收到消息后,就根据消息的key和已经设置的binding,将消息投递到一个或多个队列里。

注意:在声明一个队列后,如果将其持久化,则下次不需要进行声明。

3)Exchange说明

Exchange类型

说明

Direct交换机

依据key进行投递,

如绑定时设置了routing key为”cjavapy”,

客户端提交的消息,

则只有设置了key为”cjavapy”的才会投递到队列。

Topic交换机

对key模式匹配后进行投递,

符号”#”匹配一个或多个词,符号”*”匹配一个词。

Fanout交换机

不需要key,采取广播模式,

一个消息进来时,投递到与该交换机绑定的所有队列

4、topic匹配模式的使用

topic 类型与 direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。而topic 类型Exchange 可以让队列在绑定 RoutingKey 时使用通配符,RoutingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,如: item.insert

通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,如,item.# 能够匹配 item.insert.abc 或者 item.insertitem.* 只能匹配 item.insert

1)生产者

import pika
import sys

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()
# 声明⼀个名为direct_logs的direct类型的exchange
# direct类型的exchange
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
# 从命令⾏获取basic_publish的配置参数
severity = "cjavapy.node"
message = 'Hello World!'
# 向名为direct_logs的exchange按照设置的routing_key发送message
channel.basic_publish(exchange='topic_logs',routing_key=severity,body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

2)消费者

import pika
import sys

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()
# 声明⼀个名为direct_logs类型为direct的exchange
# 同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
result = channel.queue_declare(queue='cjavapy_topic',durable=True )
queue_name = result.method.queue
# 从命令⾏获取参数:routing_key
severity = "cjavapy.*"
# exchange和queue之间的binding可接受routing_key参数
# fanout类型的exchange直接忽略该参数。direct类型的exchange精确匹配该关键字进⾏message路由
# ⼀个消费者可以绑定多个routing_key
# Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,
# 如果满⾜要求,就往BindingKey所绑定的Queue发送消息
    channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=severity)
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name,on_message_callback=callback)
channel.start_consuming()

相关文档:

Python RabbitMQ pika的安装及work消息模型的使用

Python RabbitMQ pika的安装及单生产单消费模型的使用

Python RabbitMQ pika的安装及fanout消息订阅模式的使用

Python RabbitMQ pika的安装及direct路由模式的使用

推荐阅读
cjavapy编程之路首页