消息队列RabbitMQ-使用Python操作RabbitMQ
简单的消息队列
消息生产者向队列中发送message,消费者从消息队列中取出消息并且消费。
- 生产者
# -*- coding=utf-8 -*-
import pika
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="hello")
channel.basic_publish(
exchange='',
routing_key="hello",
body="hello world"
connection.close()
- 消费者
# -*- coding=utf-8 -*-
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="hello")
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
- 消息确认
当客户端从队列中取出消息之后,可能需要一段时间才能处理完成,如果在这个过程中,客户端出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了,因为rabbitMQ默认会把消息标记为已经完成,然后从队列中移除。
消息确认是客户端从RabbitMQ中取出消息,并完成处理之后,会发送一个Ack信号通知RabbitMQ,消除处理完成。
当RabbitMQ收到客户端的获取消息请求之后,或标记为处理中,当再次收到Ack之后,才会标记为已完成,然后从队列中删除。当RabbitMQ检测到客户端和自己断开链接之后,还没收到Ack,则会重新将消息放回消息队列,交给下一个客户端处理,保证消息不丢失,也就是说,RabbitMQ给了客户端足够长的时间来做数据处理。
上述代码中,
auto_ack=True
表示自动发送确认消息,即使消息被处理失败,消息也会被消费掉,即发生数据丢失的情况。
如果设置
auto_ack=False
则需要显式确认消息(在未发送确认信信号之前,消息一直存在)。
显示调用确认消息:
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息
使用交换器
交换器主要负责从生产者那里接受push的消息,根据生产者的定义规则,投递到队列中,是生产者和队列的中间件。
使用fanout实现发布订阅者模型
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在
所有的相关队列中。
- 生产者
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test123',type='fanout') #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange
queue_name = rest.method.queue #获取队列名
channel.queue_bind(exchange='test123',queue=queue_name) #将随机队列名和exchange进行绑定
def callback(ch, method, properties, body):
'''回调函数,处理从rabbitmq中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
- 订阅者
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test123',type='fanout') #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange
queue_name = rest.method.queue #获取队列名
channel.queue_bind(exchange='test123',queue=queue_name) #将随机队列名和exchange进行绑定
def callback(ch, method, properties, body):
'''回调函数,处理从rabbitmq中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
注意:
需先定义订阅者,启动订阅者,否则发布者publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,msg是被丢弃的。
使用direct 实现根据关键字发布消息
消息发布订阅者模型是发布者发布一条消息,所有订阅者都可以收到。现在rabbitmq还支持根据关键字发送,在发送消息的时候使用routing_key参数指定关键字,rabbitmq的exchange会判断routing_key的值,然后只将消息转发至匹配的队列,注意,此时需要订阅者先创建队列。
配置参数为exchange的type=direct,然后定义routing_key即可。
- 订阅者1:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test321',type='direct') #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange
queue_name = rest.method.queue #获取队列名
severities = ['error','warning','info'] #定义三个routing_key
for severity in severities:
channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)
def callback(ch, method, properties, body):
'''回调函数,处理从rabbitmq中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
- 订阅者2:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test321',type='direct') #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange
queue_name = rest.method.queue #获取队列名
severities = ['error','warning'] #定义两个routing_key
for severity in severities:
channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)
def callback(ch, method, properties, body):
'''回调函数,处理从rabbitmq中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
- 生产者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.exchange_declare(exchange='test321',type='direct')
channel.basic_publish(exchange='test321', routing_key='info', body='info msg',properties=pika.BasicProperties(delivery_mode=2)) #发送info msg到 info routing_key
channel.basic_publish(exchange='test321', routing_key='error', body='error msg',properties=pika.BasicProperties(delivery_mode=2)) #发送error msg到 error routing_key
print('send success msg[] to rabbitmq')
connection.close() #关闭连接**
- 效果:
发现订阅者1和订阅者2都收到error消息,但是只有订阅者1收到了info消息
订阅者1:
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'info msg'
[x] Received b'error msg'
订阅者2:
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'error msg'
使用topic实现模糊匹配发布消息
Direct实现了根据自定义的routing_key来标示不同的queue,使用topic可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
匹配规则为:
- # 表示可以匹配0个或多个单词
- * 表示只能匹配一个单词
- 订阅者1:使用#匹配
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test333',type='topic') #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange
queue_name = rest.method.queue #获取队列名
channel.queue_bind(exchange='test333', routing_key='test.#',queue=queue_name)
def callback(ch, method, properties, body):
'''回调函数,处理从rabbitmq中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
- 订阅者2:使用*匹配
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test333',type='topic') #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange
queue_name = rest.method.queue #获取队列名
channel.queue_bind(exchange='test333', routing_key='test.*',queue=queue_name)
def callback(ch, method, properties, body):
'''回调函数,处理从rabbitmq中取出的消息'''
print(" [x] Received %r" % body)
time.sleep(1)
ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #开始监听 接受消息
- 生产者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1', port=5672, )) #定义连接池
channel = connection.channel() #声明队列以向其发送消息消息
channel.exchange_declare(exchange='test333',type='topic')
channel.basic_publish(exchange='test333', routing_key='test.123', body='test.123 msg',properties=pika.BasicProperties(delivery_mode=2))
channel.basic_publish(exchange='test333', routing_key='test.123.321', body=' test.123.321 msg',properties=pika.BasicProperties(delivery_mode=2))
print('send success msg[] to rabbitmq')
connection.close() #关闭连接
- 效果:
订阅者1:
[*] Waiting for messages. To exit press CTRL+C