本页目录

MQ 是中高级别Java程序员,必须要掌握的一门技术。

什么是MQ?

Message Quete 消息队列,是指消息传输中存储消息的容器。多用于分布式系统。

MQ的优势 劣势

优势

  • 应用解耦 :系统耦合性越高,容错性、可维护性就很低。(降低服务与服务之间的远程调用)
  • 异步提速 :服务与服务之间交流会消耗大量的时间,使用消息中间件,不用担心对方有没有直接收到消息,会节省很多的时间。。
  • 削峰填谷 :将大量的直接的访问,打在MQ中,不直接打在服务上。然后服务慢慢的从MQ中拉取处理。牺牲时间换稳定性 。例如:高并发情况,用户操作数据库,数据库容易挂,只需将操作的消息,存储到MQ中,让其他服务获取MQ消息,慢慢处理。顺时压力转移到MQ上。

劣势

  • 一致性问题 :消息由A、B 、C依次处理,A、B成功,C失败,消息就不一致了。
  • 系统可用性降低 :引入MQ,就需要保证MQ正常启动。
  • 系统复杂度提高 :引入MQ,需要保证消息正常被获取。

使用MQ的场景

  • 生产者不需要从消费者获取反馈。(如果需要A调用B -> B做完 ->A获取B做完后 继续做下面的事情。就不适合使用MQ)
  • 容许短暂的不一致性
  • MQ 优势确实大,但需确保有实力能做好MQ

常见的MQ产品

Erlang是 为高并发设计的语言 ,他不是Golang!

得益于微妙级别的延迟,RabbitMQ更适合做游戏中间件使用。

RabbitMQ的简介

AMQP,06年发布,Advanced Meaagse Quruing Ptorocol 即高级队列协议,是一个网络协议,属应用层,为面向消息的中间件设计。基于此协议的客户端与消息中间件产品进行通信,不受客户端\中间件产品不同的限制,不同语言均可开发。

可以把AMQP 理解成HTTP 协议,AMQP 协议中 定义了一些角色

  • Publisher:消息生产者
  • Excange:交换机:分发消息
  • Routes:路由
  • Queue:消息存储的容器
  • Consumer:消息消费者

RabbitMQ 基础架构图

RabbitMQ 基本概念

一、整体架构层

1. Broker(消息代理)

  • 定义:RabbitMQ 服务进程本身,是消息系统的核心中枢。
  • 作用:负责接收、存储、路由和转发消息,管理所有客户端连接和资源。
  • 类比:整个 “消息邮局” 的大楼。

2. Virtual Host(虚拟主机,vhost)

  • 定义:Broker 内部的逻辑隔离容器,每个 vhost 是一套独立的消息环境。
  • 作用
    • 隔离不同业务或环境的队列、交换机、用户权限。
    • 一个 Broker 可以有多个 vhost,互不干扰。
  • 类比:邮局大楼里不同的 “业务分区”,比如 “电商区”“政务区”,各自独立运营。

3. User & Permissions(用户与权限)

  • 定义:用于认证和授权的账号体系。
  • 作用
    • 用户必须通过认证才能连接 Broker。
    • 权限按 vhost 分配,控制用户在某个 vhost 内的读、写、配置权限。
  • 类比:进入不同业务分区的 “门禁卡”,不同卡的权限不同。

二、连接与通信层

4. Connection(连接)

  • 定义:客户端(生产者 / 消费者)与 Broker 之间的 TCP 网络连接。
  • 作用:建立物理通信通道,一个客户端通常只需要一个 Connection。

5. Channel(信道)

  • 定义:Connection 内部的轻量级逻辑连接,所有消息操作都在 Channel 上完成。
  • 作用
    • 复用同一个 TCP 连接,减少系统开销。
    • 一个 Connection 可以包含多个 Channel,每个 Channel 独立处理消息。
  • 类比:Connection 是 “网线”,Channel 是网线上的 “子通道”。

三、消息路由层

6. Exchange(交换机)

  • 定义:消息的路由入口,生产者将消息发送到 Exchange,而非直接发送到队列。
  • 作用:根据类型和绑定规则,将消息转发到一个或多个队列。
  • 常见类型
    • direct:精确匹配 Routing Key,一对一投递。
    • fanout:广播模式,转发到所有绑定的队列。
    • topic:模式匹配 Routing Key,支持通配符(*#)。
    • headers:根据消息头属性匹配,而非 Routing Key。
  • 类比:邮局的 “分拣台”,根据收件地址把信件分到不同的投递窗口。

7. Binding(绑定)

  • 定义:Exchange 与 Queue 之间的关联规则。
  • 作用:定义消息如何从 Exchange 路由到 Queue,每个绑定包含一个 Routing Key。
  • 类比:分拣台到投递窗口的 “分拣规则”,比如 “地址含‘北京’的信件分到窗口 A”。

四、消息存储与消费层

8. Queue(队列)

  • 定义:消息的存储容器,消息在被消费前暂存在队列中。
  • 作用
    • 支持持久化(可选),Broker 重启后消息不丢失。
    • 支持消息确认(ACK),确保消息被正确处理。
  • 常见类型
    • Classic:传统队列,性能高,适合大多数场景。
    • Quorum:基于 Raft 协议的分布式队列,强一致性,适合高可用场景。
    • Stream:持久化的日志式队列,适合重放和历史数据处理。
  • 类比:投递窗口的 “待取信件筐”。

9. Message(消息)

  • 定义:被传输的数据载体。
  • 组成
    • Payload:业务数据(如 JSON、XML)。
    • Header:元数据,如持久化标记、过期时间、自定义属性等。
  • 状态
    • Ready:等待消费。
    • Unacked:已投递但未被消费者确认。
    • Dead Letter:因过期、被拒绝或队列满而进入死信队列。

10. Producer(生产者)

  • 定义:发送消息到 Exchange 的应用程序。
  • 作用:产生业务数据并封装成消息,发布到 Broker。

11. Consumer(消费者)

  • 定义:从 Queue 中订阅或拉取消息并处理的应用程序。
  • 作用:消费队列中的消息,执行业务逻辑,并通过 ACK 确认消息处理完成。

五、高级特性层

12. Dead Letter Exchange(死信交换机,DLX)

  • 定义:处理 “死信” 的特殊交换机。
  • 作用:当消息因过期、被拒绝或队列满而无法正常消费时,会被转发到 DLX,便于后续排查和处理。

13. Publisher Confirms(发布确认)

  • 定义:生产者确认消息已被 Broker 安全接收的机制。
  • 作用:确保消息不丢失,提升系统可靠性。

14. Cluster(集群)

队列可以镜像到多个节点,避免单点故障。

定义:多个 Broker 节点组成的分布式系统。

作用:提升吞吐量和可用性。

graph TD A[Producer 生产者] -->|1. 建立 Connection| B[Broker] B -->|2. 创建 Channel| C[Channel] A -->|3. 发布消息到 Exchange| D[Exchange] D -->|4. 根据 Binding & Routing Key 路由| E[Queue] E -->|5. 消息入队,状态: Ready| E F[Consumer 消费者] -->|6. 建立 Connection| B B -->|7. 创建 Channel| G[Channel] F -->|8. 订阅/拉取队列| E E -->|9. 投递消息到 Consumer,状态: Unacked| F F -->|10. 处理业务逻辑| F F -->|11. 发送 ACK 确认| E E -->|12. 移除已确认消息| E

RabbitMQ 的7种工作模式

RabbitMQ 交换机类型与工作模式(完整汇总版)

🧩 一、标准工作模式(核心交换机类型)

序号模式名称核心交换机类型路由逻辑说明是否依赖 routing_key典型应用场景
直连模式(Direct)direct路由键精准匹配队列绑定键✅ 是点对点通信、任务精准分发、RPC 调用
发布/订阅模式(Publish/Subscribe)fanout广播消息到所有绑定队列❌ 否系统通知、日志广播、事件分发
主题模式(Topic)topic路由键支持通配符匹配(* 匹配一个词,# 匹配多个)✅ 是业务分类消息、微服务事件总线
头交换模式(Headers)headers根据消息 Header 属性副本匹配条件(如 x-match=all/any❌ 否条件匹配分发、业务属性过滤
默认模式(Default)系统默认交换机(无名)routing_key 自动映射到同名队列✅ 是简化模型、快速原型调试

⚙️ 二、常见扩展工作模式(应用层封装)

序号模式名称基于哪种交换机特点 / 机制应用场景
工作队列模式(Work Queues)可以用 direct多消费者竞争同一个队列,保证每条消息只被一个消费者处理异步任务、并行执行、后台处理
延迟队列模式(Delay Queue)任意交换机 + TTL + DLX通过消息过期后转投死信队列实现延时处理定时任务、消息重试
死信队列模式(Dead Letter Queue, DLX)任意交换机消息被拒绝、过期或队列满时进入死信交换机异常消息补偿、可靠性机制
RPC 模式(Request/Reply)direct客户端发送请求消息,服务端响应结果到回调队列远程服务调用、结果返回机制

💡 三、快速分类总结

类型通信模型特征
Direct一对一精确路由
Fanout (Publish/Subscribe)一对多广播分发
Topic多对多按规则匹配
Headers条件过滤按属性匹配
Default简单直投无需绑定
Work Queues并发处理消息排队
Delay / DLX时间控制延迟或补偿
RPC请求响应双向通信

RabbitMQ Linux 安装

官网:https://www.rabbitmq.com/
下载地址:https://github.com/rabbitmq/rabbitmq-server/releases
Java 官方文档:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

JMS :Java Message Server,是Jave SE 提供面向消息中间件的API接口

ActiveMQ 遵循JMS规范,但是RabbitMQ 没有遵循。

centOS7 请参考:https://www.zanglikun.com/13183.html

服务端口 5672 WEB端口 15672

(安装前,需要安装erlang环境,RabbitMQ)

不同版本的RabbitMQ 必须使用对应范围的erlang环境:这点很重要,如果你是新手,不信这点,你还会再回来,看这句话的。版本对应地址:https://www.rabbitmq.com/which-erlang.html

RabbitMQ的配置:https://www.rabbitmq.com/configure.html#config-items

服务器需要设置:安全组,防火墙 开放 5672、15672 tcp端口

放行一些防火墙的端口(如果是云服务器的话,安全组 我默认你开放了)

开放端口(开放后需要要重启防火墙才生效)
firewall-cmd --zone=public --add-port=端口号/tcp --permanent

重启防火墙
firewall-cmd --reload

1、环境确认

这里,我不扯淡,翻以前的资料确实坑,CentOS8 不支持一些erlang的版本,但是老版本RabbitMQ 又依赖于它。不如我们干脆,干最新的版本
首先 先去找你想要的RabbitMQ版本 ,我们需要满足2个条件 centOS8 能安装rabbitMQ、又要满足与centOS8 能安装erlang,所以RabbitMQ 版本越新越好。

这里我使用的RabbitMQ 是3.8.9
Erlang版本是 从GitHub上弄下来的,你可以看下面自己弄。

2、卸载老版本RabbitMQ、erlang 没有的直接跳过

# 如果有老版本 直接干掉 这里有问题,可以去百度卸载 即可。
# 先暂停服务 
service rabbitmq-server stop

# 卸载RabbitMQ
yum -y remove rabbitmq-server.noarch

# 卸载erlang
yum -y remove erlang-*
yum remove erlang.x86_64

# 删不干净,但不影响使用,具体可以百度。但不保证 能删干净。

3、安装

一、安装erlang
 
# 添加仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
Detected operating system as centos/8.

# 查看erlang版本
yum list | grep erlang

# cnetso 8 安装erlang
dnf install erlang

# centOS 7 安装 请使用 yum install erlang

# 查看当前erlang的版本
erl -version

# 找到合适的RabbitMQ的版本下载下来 https://github.com/rabbitmq/rabbitmq-server/releases
# 格式:rpm -ivh 你下载的mq.rpm文件,上传到服务器
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 

#centOS7 可能会缺失socat的话,那就
yum install socat

安装完成后,就可以启动了,RabbitMQ 安装后,就是内置的服务,可以直接service启动

# 启动命令
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
不同版本显示不一样,最终还是需要以 lsof -i:5672 被占用为准

(这里有个巨大的BUG,启动提示你成功,一定要看 lsof -i:5672 看下端口是否被占用,不占用,一定要去看下 erl -v 看下版本,再去看下 https://www.rabbitmq.com/which-erlang.html 看下rabbitMQ 与 erlang 的版本是否兼容,不兼容,你累死,也找不到原因

这是看当前相关软件版本的命令

# 查看erlang的版本
yum list | grep erlang

# 查看rabbitMQ的版本
yum list | grep rabbitmq-server

然后去开启RabbitMQ的管理界面端口15672

默认此插件是不开启的,我们只有开启才能访问Dashboard

# 直接输入命令:开启管理界面
rabbitmq-plugins enable rabbitmq_management


# 修改默认配置信息,一般不要动
vim /usr/lib/rabbitmq/lib/rabbitmq_server-版本/ebin/rabbit.app 
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

保存文件 重启即可

RabbitMQ常用命令

添加账号

# 添加开机启动RabbitMQ服务
chkconfig rabbitmq-server on

# 启动服务
service rabbitmq-server start

# 开启web端ui
 占用端口15672  请务必 开启云服务器的 安全组与防火墙放行
rabbitmq-plugins enable rabbitmq_management

# 停止服务
service rabbitmq-server stop

# 查看当前所有用户
rabbitmqctl list_users

# 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
rabbitmqctl delete_user guest

# 必须先启动才能执行
# 添加新用户
rabbitmqctl add_user 用户名 密码
# 比如 rabbitmqctl add_user zanglikun zanglikun


# 设置权限,并开启远程访问
rabbitmqctl set_user_tags 用户名 administrator
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
# 比如 rabbitmqctl set_user_tags zanglikun administrator
# 比如 rabbitmqctl set_permissions -p / zanglikun ".*" ".*" ".*"

# 重启服务
service rabbitmq-server restart

访问吧 http://192.168.3.10:15672/ IP 你自己替换下

管理界面使用 15672

步骤 1:启动 RabbitMQ

  • 使用命令 启用 Management 插件(后台管理界面)
# Linux/Mac 启用插件(关键!否则没有后台界面)
rabbitmq-plugins enable rabbitmq_management
  • 启动 RabbitMQ 服务后,访问后台:http://服务器IP:15672(默认账号:guest/guest,仅本地访问可用)。

步骤 2:RabbitMQ 后台核心配置(重点)

这是你最关心的部分,所有配置都在后台完成,按优先级排序:

1. 配置用户(Users)

默认的 guest 账号仅允许本地访问,生产环境必须创建专属用户:

  • 后台路径:Admin → Users → Add a user
  • 配置项:
    • Username:自定义(如 govbuy_user);
    • Password:设置密码;
    • Tags:分配角色(常用:administrator(管理员)、monitoring(监控)、policymaker(策略)、management(仅后台访问)、none(仅业务访问));
  • 注意:生产环境避免给业务用户分配 administrator 角色,最小权限原则。

角色权限说明

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

2. 配置虚拟主机(Virtual Hosts)

隔离不同业务的资源(你之前问过的 govbuy 就是这类):

  • 后台路径:Admin → Virtual Hosts → Add a virtual host
  • 配置项:
    • Name:自定义(如 /govbuy,建议加 / 前缀区分);
    • 无需其他配置,点击 Add virtual host 即可创建。

3. 配置用户权限(Permissions)

给用户分配指定虚拟主机的操作权限,核心步骤(否则用户无法访问 vhost):

  • 后台路径:Admin → Users → 点击对应用户 → Set permission
  • 配置项:
    • Virtual host:选择要授权的 vhost(如 /govbuy);
    • Configure regex:允许配置的资源正则(.* 表示所有,生产可限定如 govbuy.*);
    • Write regex:允许写入的资源正则(.*);
    • Read regex:允许读取的资源正则(.*);
  • 点击 Set permission 完成授权。

4. 配置交换机(Exchanges)(可选:后台 / 代码都可创建)

交换机是消息路由的入口,可后台手动创建(也可在代码中声明):

  • 后台路径:Exchanges → Add a new exchange
  • 配置项:
    • Name:交换机名称(如 govbuy_exchange);
    • Type:选择类型(direct/fanout/topic/headers,根据业务选);
    • Durable:是否持久化(建议勾选,Broker 重启不丢失);
    • Auto delete:是否自动删除(取消勾选,生产环境禁用);
    • 其他默认即可,点击 Add exchange

5. 配置队列(Queues)(可选:后台 / 代码都可创建)

消息的存储容器,同样可后台 / 代码创建:

  • 后台路径:Queues → Add a new queue
  • 配置项:
    • Name:队列名称(如 govbuy_queue);
    • Virtual host:选择所属 vhost;
    • Durable:是否持久化(建议勾选);
    • Auto delete:取消勾选;
    • Arguments(高级):可配置死信交换机、消息过期时间等(如 x-dead-letter-exchange: govbuy_dlx);
    • 点击 Add queue

6. 绑定交换机与队列(Bindings)

定义消息从交换机路由到队列的规则:

  • 后台路径:Exchanges → 点击已创建的交换机 → Bindings → Add binding from exchange
  • 配置项:
    • To queue:选择要绑定的队列;
    • Routing key:路由键(根据交换机类型配置:direct 填精确值,topic 填通配符,fanout 留空);
    • 点击 Bind 完成绑定。

客户端首次连接并使用该 vhost 时,RabbitMQ 自动隐式创建 默认交换机,默认交换机是一个无名交换机(名称为空字符串),类型为direct,属于 vhost 的内置基础组件,但它的触发条件是 “首次使用”,而非 “vhost 创建”; 除了默认交换机,其他所有交换机(如fanouttopic类型)都需要你手动通过控制台、CLI 或代码创建。

你自己创建的交换机是需要绑定的

7. 高级配置(可选,生产环境常用)

  • 死信队列(DLX):在队列的 Arguments 中配置 x-dead-letter-exchange(死信交换机)、x-dead-letter-routing-key(死信路由键);
  • 消息过期时间(TTL):队列级别(x-message-ttl)或消息级别(代码中设置);
  • 队列长度限制x-max-length(超出后丢弃 / 死信);
  • 策略(Policies):批量配置队列 / 交换机的属性(如镜像队列、TTL),路径:Admin → Policies → Add / update a policy
特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取最新全部资料 ❤

免责声明:
本站文章旨在总结学习互联网技术过程中的经验与见解。任何人不得将其用于违法或违规活动!所有违规内容均由个人自行承担,与作者无关。