消息队列简介及 RabbitMQ 的使用方法

Python七号

共 3399字,需浏览 7分钟

 ·

2021-08-25 17:46

消息队列是最古老的中间件之一,从系统之间有通信需求开始,就自然产生了消息队列。如果你还没有用过消息队列,那是时候好好学习一下了。本文告诉什么是消息队列,为什么需要消息队列,常见的消息队列有哪些, RabbitMQ 的部署和使用。

什么是消息队列

消息队列拆开了看,就是消息 + 队列,消息是什么?其实就是程序之间通讯所用到的数据,消息从生产者那里产生,进入队列后,安装设计好的规则出队,由消费者消费。仅此而已。

为什么需要消息队列

消息队列,最重要的是队列,可以想象一下没有队列的场景,你去银行办业务的时候,大家都不排队的场景,大家都堆在一起,个子小没力气的根本办不了业务。

如果没有消息队列,你的系统将严重耦合,在升级维护的时候牵一发而动全身。

如果没有消息队列,你的系统的很多功能都是同步的,同步意味着前面的事件完成后,才可以进行后续的操作,前端用户的会觉得卡顿,体验很差。

如果没有消息队列,web 系统突然面对高并发的访问请求,可能会崩溃。

有了消息队列,系统解耦、异步通信、流量削峰、延迟通知、最终一致性保证、顺序消息、流式处理等需求都可以轻松解决。

常见的消息队列

比较常见的消息队列产品主要有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ 等。

ActiveMQ

Apache ActiveMQ 是 Apache 软件基金会所研发的开放源码消息中间件;由于 ActiveMQ 是一个纯Java程序,因此只需要操作系统支持 Java 虚拟机,ActiveMQ 便可执行。

  • 支持Java消息服务 (JMS) 1.1 版本
  • Spring Framework
  • 集群 (Clustering)
  • 协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP 以及 AMQP

RabbitMQ

RabbitMQ 实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用高性能、健壮以及可伸缩性出名的 Erlang 语言编写的,支持所有主流的操作系统如 Linux,Windows,MacOS。客户端支持所有主要的编程语言。

  • 可伸缩性:集群服务
  • 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存

ZeroMQZeroMQ(也拼写作 0MQ 或 ZMQ )是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列, 但是与面向消息的中间件不同,ZeroMQ 的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字风格的API。

ZeroMQ 是由 iMatix 公司和大量贡献者组成的社群共同开发的。ZeroQ 通过许多第三方软件支持大部分流行的编程语言,从 Java 和Python 到 Erlang 和 Haskell。

Kafka

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka 可以通过 Kafka Connect 连接到外部系统(用于数据输入/输出),并提供了 Kafka Streams 的流式处理库。该设计受事务日志的影响较大。

RocketMQ

RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是 2012 年阿里巴巴开源的第三代分布式消息中间件,2016 年 11 月 21 日,阿里巴巴向 Apache 软件基金会捐赠了 RocketMQ;第二年 2 月 20 日,Apache 软件基金会宣布 Apache RocketMQ 成为顶级项目。

消息队列的选型需要根据具体应用需求而定,ZeroMQ 小而美,RabbitMQ 大而稳,Kakfa 和 RocketMQ 快而强劲。

RabbitMQ 的部署和使用

推荐 Docker 部署,在安装 Docker 的环境下,执行:

docker run -d --hostname my-rabbit -p 15672:15672 -p 5672:5672 -- name rabbit-server -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

现在在浏览器中打开 localhost:15672 。用户名是 user ,密码是 password

接下来,让我们创建一个队列,名字叫 my_app

创建一个 Exchange,名字叫 my_exchange

点击 Exchange 标签页,点击 my_exchange 进入详情页, 将 my_exchange 和 my_app 绑定,路由密钥设置为 test:

Python 编写生产者

现在可以使用 Python 编写生产者,来生产一条消息放入队列,并观察 Web 页面的变动情况:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'5672'/', pika.PlainCredentials('user''password')))
channel = connection.channel()

channel.basic_publish(exchange='my_exchange', routing_key='test', body='Test!')

connection.close()

执行上面的代码,即可将消息放入队列,这里我执行了四次,可以看到有四条消息:

消息将保留在队列中,直到消费者把它取出,接下来我们写一个消费消息的程序。

Python 编写消费者

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'5672'/', pika.PlainCredentials("user""password")))
channel = connection.channel()

def callback(ch, method, properties, body):
    print(f'{body} is received')

channel.basic_consume(queue="my_app", on_message_callback=callback, auto_ack=True)
channel.start_consuming()

执行:

此时,队列已经空了:

这段代码最低限度地演示了如何将消息发布到 RabbitMQ 中,更多用法还请移步到官方文档。

最后的话

消息队列可以进行系统模块之间的解耦,但自己就成了关键节点,在集群部署和故障转移方面,需要系统管理员的大量关注。本文简要介绍了什么是消息队列,为什么需要消息队列,常见的消息队列有哪些,RabbitMQ 的部署和使用,如果对你有所帮助,请点赞支持,欢迎留言讨论。


浏览 28
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报