RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件 ,RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。本文主要介绍Python中使用pika模块work消息模型来实现与RabbitMQ通讯,以及相关的示例代码。

1、安装RabbitMQ

参考文档:

Windows 上下载安装 RabbitMQ 的方法步骤

Linux 上下载安装 RabbitMQ 的方法步骤

2、安装pika

pika 是 AMQP 0-9-1 协议的纯 Python 实现,它基本保持完全独立于底层网络支持库。Pika可以通过PyPI下载,并可以使用easy_install或pip安装:

pip install pika

easy_install pika

相关文档:https://pika.readthedocs.io/en/stable/

3、使用说明

1)关键词说明

关键词

说明

Broker

消息队列服务器实体。

Exchange

消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue

消息队列载体,每个消息都会被投入到一个或多个队列。

Binding

绑定,它的作用就是把exchange

和queue按照路由规则绑定起来。

Routing Key

路由关键字,exchange根据这个关键字进行消息投递。

vhost

虚拟主机,一个broker里可以开设多个vhost,

用作不同用户的权限分离。

producer

消息生产者,就是投递消息的程序。

consumer

消息消费者,就是接受消息的程序。

channel

消息通道,在客户端的每个连接里,

可建立多个channel,每个channel代表一个会话任务。 

2)消息队列运行机制

客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。

Exchange接收到消息后,就根据消息的key和已经设置的binding,将消息投递到一个或多个队列里。

注意:在声明一个队列后,如果将其持久化,则下次不需要进行声明。

3)Exchange说明

Exchange类型

说明

Direct交换机

依据key进行投递,

如绑定时设置了routing key为”cjavapy”,

客户端提交的消息,

则只有设置了key为”cjavapy”的才会投递到队列。

Topic交换机

对key模式匹配后进行投递,

符号”#”匹配一个或多个词,符号”*”匹配一个词。

Fanout交换机

不需要key,采取广播模式,

一个消息进来时,投递到与该交换机绑定的所有队列

4、work消息模型的使用

密集任务发送到消息队列,由多个消费者进行监听消费,每个消息只能由一个消费者接收。

1)生产者

import pika
credentials = pika.PlainCredentials('admin', 'admin') # mq⽤户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',
credentials=credentials))
# 建⽴rabbit协议的通道
channel = connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable指定队列是否持久化。确保没有确认的消息不会丢失
channel.queue_declare(queue='cjavapy', durable=True)#, arguments={"x-queue-type": "stream"})
# message不能直接发送给queue,需经exchange到达queue,此处使⽤以空字符串标识的默认的exchange
# 向队列插⼊数值 routing_key是队列名
# basic_publish的properties参数指定message的属性。此处delivery_mode=2指明message为持久的
for i in range(10):
    channel.basic_publish(exchange='',routing_key='cjavapy',body='Hello world!%s' % i,properties=pika.BasicProperties(delivery_mode=2))
# 关闭与rabbitmq server的连接
connection.close()
# 消费者代码,consume1与consume2

2)消费者

import pika
import time
credentials = pika.PlainCredentials('admin', 'admin')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',
credentials=credentials))
channel = connection.channel()
# 申明消息队列。当不确定⽣产者和消费者哪个先启动时,可以两边重复声明消息队列。
channel.queue_declare(queue='cjavapy', durable=True)# , arguments={"x-queue-type": "stream"})
# 定义⼀个回调函数来处理消息队列中的消息,这⾥是打印出来
def callback(ch, method, properties, body):
    # ⼿动发送确认消息
    time.sleep(10)
    print(body.decode())
    # 告诉⽣产者,消费者已收到消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 如果该消费者的channel上未确认的消息数达到了prefetch_count数,则不向该消费者发送消息
channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,⽤callback来接收消息
# 默认情况下是要对消息进⾏确认的,以防⽌消息丢失。
# 此处将no_ack明确指明为True,不对消息进⾏确认。
channel.basic_consume('cjavapy',on_message_callback=callback)
# auto_ack=True) # ⾃动发送确认消息
# 开始接收信息,并进⼊阻塞状态,队列⾥有信息才会调⽤callback进⾏处理
channel.start_consuming()

推荐文档