1、安装RabbitMQ
参考文档:
2、安装pika
pika 是 AMQP 0-9-1 协议的纯 Python 实现,它基本保持完全独立于底层网络支持库。Pika可以通过PyPI下载,并可以使用easy_install或pip安装:
pip install pika
或
easy_install pika
相关文档:https://pika.readthedocs.io/en/stable/
3、使用说明
1)关键词说明
关键词 | 说明 |
Broker | 消息队列服务器实体。 |
Exchange | 消息交换机,它指定消息按什么规则,路由到哪个队列。 |
Queue | 消息队列载体,每个消息都会被投入到一个或多个队列。 |
Binding | 绑定,它的作用就是把exchange 和queue按照路由规则绑定起来。 |
Routing Key | 路由关键字,exchange根据这个关键字进行消息投递。 |
vhost | 虚拟主机,一个broker里可以开设多个vhost, 用作不同用户的权限分离。 |
producer | 消息生产者,就是投递消息的程序。 |
consumer | 消息消费者,就是接受消息的程序。 |
channel | 消息通道,在客户端的每个连接里, 可建立多个channel,每个channel代表一个会话任务。 |
2)消息队列运行机制
客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。
Exchange接收到消息后,就根据消息的key和已经设置的binding,将消息投递到一个或多个队列里。
注意:在声明一个队列后,如果将其持久化,则下次不需要进行声明。
3)Exchange说明
Exchange类型 | 说明 |
Direct交换机 | 依据key进行投递, 如绑定时设置了routing key为”cjavapy”, 客户端提交的消息, 则只有设置了key为”cjavapy”的才会投递到队列。 |
Topic交换机 | 对key模式匹配后进行投递, 符号”#”匹配一个或多个词,符号”*”匹配一个词。 |
Fanout交换机 | 不需要key,采取广播模式, 一个消息进来时,投递到与该交换机绑定的所有队列 |
4、topic匹配模式的使用
topic 类型与 direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。而topic 类型Exchange 可以让队列在绑定 RoutingKey 时使用通配符,RoutingKey 一般都是有一个或多个单词组成,多个单词之间以.
分割,如: item.insert
通配符规则:#
匹配一个或多个词,*
匹配不多不少恰好1个词,如,item.#
能够匹配 item.insert.abc
或者 item.insert
,item.*
只能匹配 item.insert
1)生产者
import pika import sys credentials = pika.PlainCredentials('admin', 'admin') connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials)) channel = connection.channel() # 声明⼀个名为direct_logs的direct类型的exchange # direct类型的exchange channel.exchange_declare(exchange='topic_logs', exchange_type='topic') # 从命令⾏获取basic_publish的配置参数 severity = "cjavapy.node" message = 'Hello World!' # 向名为direct_logs的exchange按照设置的routing_key发送message channel.basic_publish(exchange='topic_logs',routing_key=severity,body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
2)消费者
import pika import sys credentials = pika.PlainCredentials('admin', 'admin') connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials)) channel = connection.channel() # 声明⼀个名为direct_logs类型为direct的exchange # 同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在 channel.exchange_declare(exchange='topic_logs',exchange_type='topic') result = channel.queue_declare(queue='cjavapy_topic',durable=True ) queue_name = result.method.queue # 从命令⾏获取参数:routing_key severity = "cjavapy.*" # exchange和queue之间的binding可接受routing_key参数 # fanout类型的exchange直接忽略该参数。direct类型的exchange精确匹配该关键字进⾏message路由 # ⼀个消费者可以绑定多个routing_key # Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配, # 如果满⾜要求,就往BindingKey所绑定的Queue发送消息 channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body,)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue=queue_name,on_message_callback=callback) channel.start_consuming()
相关文档:
Python RabbitMQ pika的安装及work消息模型的使用
Python RabbitMQ pika的安装及单生产单消费模型的使用