当前快讯:Rabbitmq 介绍 、安装、基于Queue实现生产者消费者模型、基本使用、消息安全之ack、durable持久化、利用闲置消费、发布订阅、发布订阅高级之Royting(按关键字匹配)、Topic关键字模糊匹配、基于rabbitmq实现rpc

博客园 2023-05-05 21:30:43

师承老刘llnb

一、消息队列介绍

1.1介绍

消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”


(相关资料图)

1.2MQ解决什么问题

MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。应用解耦以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。

当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障。提升系统的可用性流量削峰举个栗子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。

使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这事有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。消息分发多个服务队数据感兴趣,只需要监听同一类消息即可处理。例如A产生数据,B对数据感兴趣。如果没有消息的队列A每次处理完需要调用一下B服务。过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务。只要有服务需要,A服务都要改动代码。很不方便。有了消息队列后,A只管发送一次消息,B对消息感兴趣,只需要监听消息。C感兴趣,C也去监听消息。A服务作为基础服务完全不需要有改动异步消息有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。

这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息

1.3 常见消息队列及比较

结论:

Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka

二 Rabbitmq安装

官网:https://www.rabbitmq.com/getstarted.html

2.1 服务端原生安装

# 安装配置epel源# 安装erlangyum -y install erlang# 安装RabbitMQyum -y install rabbitmq-server

2.2服务端Docker安装

docker pull rabbitmq:managementdocker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:managemen

2.3客户端安装

pip3 install pika

2.4 设置用户和密码

rabbitmqctl add_user lqz 123# 设置用户为administrator角色rabbitmqctl set_user_tags lqz administrator# 设置权限rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"# 然后重启rabbiMQ服务systemctl reatart rabbitmq-server # 然后可以使用刚才的用户远程连接rabbitmq server了。

三 基于Queue实现生产者消费者模型

import Queueimport threadingmessage = Queue.Queue(10)def producer(i):    while True:        message.put(i)def consumer(i):    while True:        msg = message.get()for i in range(12):    t = threading.Thread(target=producer, args=(i,))    t.start()for i in range(10):    t = threading.Thread(target=consumer, args=(i,))    t.start()

四 基本使用(生产者消费者模型)

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。生产者

import pika# 无密码# connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))# 有密码credentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列)channel.queue_declare(queue="lqz")channel.basic_publish(exchange="",                      routing_key="lqz", # 消息队列名称                      body="hello world")connection.close()

消费者

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列)channel.queue_declare(queue="lqz")def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)channel.basic_consume(queue="lqz",on_message_callback=callback,auto_ack=True)channel.start_consuming()

五 消息安全之ack

生产者

import pika# 无密码# connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))# 有密码credentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列)channel.queue_declare(queue="lqz")channel.basic_publish(exchange="",                      routing_key="lqz", # 消息队列名称                      body="hello world")connection.close()

消费者

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列)channel.queue_declare(queue="lqz")def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在    # ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(queue="lqz",on_message_callback=callback,auto_ack=False)channel.start_consuming()

六 消息安全之durable持久化

生产者

import pika# 无密码# connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))# 有密码credentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以channel.queue_declare(queue="lqz1",durable=True)channel.basic_publish(exchange="",                      routing_key="lqz1", # 消息队列名称                      body="111",                      properties=pika.BasicProperties(                          delivery_mode=2,  # make message persistent,消息也持久化                      )                      )connection.close()

消费者

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列)channel.queue_declare(queue="lqz1")def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在    # ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(queue="lqz1",on_message_callback=callback,auto_ack=False)channel.start_consuming()

七 闲置消费

正常情况如果有多个消费者,是按照顺序第一个消息给第一个消费者,第二个消息给第二个消费者

但是可能第一个消息的消费者处理消息很耗时,一直没结束,就可以让第二个消费者优先获得闲置的消息生产者

import pika# 无密码# connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))# 有密码credentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以channel.queue_declare(queue="lqz123",durable=True)channel.basic_publish(exchange="",                      routing_key="lqz123", # 消息队列名称                      body="111",                      properties=pika.BasicProperties(                          delivery_mode=2,  # make message persistent,消息也持久化                      )                      )connection.close()

消费者

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列)# channel.queue_declare(queue="lqz123")def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在    ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来channel.basic_consume(queue="lqz123",on_message_callback=callback,auto_ack=False)channel.start_consuming()

八 发布订阅

发布者

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="m1",exchange_type="fanout")channel.basic_publish(exchange="m1",                      routing_key="",                      body="lqz nb")connection.close()

订阅者(启动几次订阅者会生成几个队列)

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# exchange="m1",exchange(秘书)的名称# exchange_type="fanout" , 秘书工作方式将消息发送给所有的队列channel.exchange_declare(exchange="m1",exchange_type="fanout")# 随机生成一个队列result = channel.queue_declare(queue="",exclusive=True)queue_name = result.method.queueprint(queue_name)# 让exchange和queque进行绑定.channel.queue_bind(exchange="m1",queue=queue_name)def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

九 发布订阅高级之Routing(按关键字匹配)

发布者

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="m2",exchange_type="direct")channel.basic_publish(exchange="m2",                      routing_key="bnb", # 多个关键字,指定routing_key                      body="lqz nb")connection.close()

订阅者1

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# exchange="m1",exchange(秘书)的名称# exchange_type="direct" , 秘书工作方式将消息发送给不同的关键字channel.exchange_declare(exchange="m2",exchange_type="direct")# 随机生成一个队列result = channel.queue_declare(queue="",exclusive=True)queue_name = result.method.queueprint(queue_name)# 让exchange和queque进行绑定.channel.queue_bind(exchange="m2",queue=queue_name,routing_key="nb")channel.queue_bind(exchange="m2",queue=queue_name,routing_key="bnb")def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

订阅者2

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# exchange="m1",exchange(秘书)的名称# exchange_type="direct" , 秘书工作方式将消息发送给不同的关键字channel.exchange_declare(exchange="m2",exchange_type="direct")# 随机生成一个队列result = channel.queue_declare(queue="",exclusive=True)queue_name = result.method.queueprint(queue_name)# 让exchange和queque进行绑定.channel.queue_bind(exchange="m2",queue=queue_name,routing_key="nb")def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

9.1发布订阅高级之Topic(按关键字模糊匹配)

发布者

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="m3",exchange_type="topic")channel.basic_publish(exchange="m3",                      # routing_key="lqz.handsome", #都能收到                      routing_key="lqz.handsome.xx", #只有lqz.#能收到                      body="lqz nb")connection.close()

订阅者1只能加一个单词

可以加任意单词字符

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# exchange="m1",exchange(秘书)的名称# exchange_type="direct" , 秘书工作方式将消息发送给不同的关键字channel.exchange_declare(exchange="m3",exchange_type="topic")# 随机生成一个队列result = channel.queue_declare(queue="",exclusive=True)queue_name = result.method.queueprint(queue_name)# 让exchange和queque进行绑定.channel.queue_bind(exchange="m3",queue=queue_name,routing_key="lqz.#")def callback(ch, method, properties, body):    print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

订阅者2

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# exchange="m1",exchange(秘书)的名称# exchange_type="topic" , 模糊匹配channel.exchange_declare(exchange="m3",exchange_type="topic")# 随机生成一个队列result = channel.queue_declare(queue="",exclusive=True)queue_name = result.method.queueprint(queue_name)# 让exchange和queque进行绑定.channel.queue_bind(exchange="m3",queue=queue_name,routing_key="lqz.*")def callback(ch, method, properties, body):  queue_name = result.method.queue # 发送的routing_key是什么    print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

十 基于rabbitmq实现rpc

服务端

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200",credentials=credentials))channel = connection.channel()# 起翰监听任务队列channel.queue_declare(queue="rpc_queue")def on_request(ch, method, props, body):    n = int(body)    response = n + 100    # props.reply_to  要放结果的队列.    # props.correlation_id  任务    ch.basic_publish(exchange="",                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id= props.correlation_id),                     body=str(response))    ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume( queue="rpc_queue",on_message_callback=on_request,)channel.start_consuming()

客户端

import pikaimport uuidclass FibonacciRpcClient(object):    def __init__(self):        credentials = pika.PlainCredentials("admin", "admin")        self.connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.200", credentials=credentials))        self.channel = self.connection.channel()        # 随机生成一个消息队列(用于接收结果)        result = self.channel.queue_declare(queue="",exclusive=True)        self.callback_queue = result.method.queue        # 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)        self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response, auto_ack=True)    def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:            self.response = body    def call(self, n):        self.response = None        self.corr_id = str(uuid.uuid4())        # 客户端 给 服务端 发送一个任务:  任务id = corr_id / 任务内容 = "30" / 用于接收结果的队列名称        self.channel.basic_publish(exchange="",                                   routing_key="rpc_queue", # 服务端接收任务的队列名称                                   properties=pika.BasicProperties(                                         reply_to = self.callback_queue, # 用于接收结果的队列                                         correlation_id = self.corr_id, # 任务ID                                         ),                                   body=str(n))        while self.response is None:            self.connection.process_data_events()        return self.responsefibonacci_rpc = FibonacciRpcClient()response = fibonacci_rpc.call(50)print("返回结果:",response)

标签:

广告

Copyright ?   2015-2022 北冰洋商场网版权所有  备案号:沪ICP备2020036824号-3   联系邮箱:562 66 29@qq.com