夜猫子的知识栈 夜猫子的知识栈
首页
  • 前端文章

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《Web Api》
    • 《ES6教程》
    • 《Vue》
    • 《React》
    • 《TypeScript》
    • 《Git》
    • 《Uniapp》
    • 小程序笔记
    • 《Electron》
    • JS设计模式总结
  • 《前端架构》

    • 《微前端》
    • 《权限控制》
    • monorepo
  • 全栈项目

    • 任务管理日历
    • 无代码平台
    • 图书管理系统
  • HTML
  • CSS
  • Nodejs
  • Midway
  • Nest
  • MySql
  • 其他
  • 技术文档
  • GitHub技巧
  • 博客搭建
  • Ajax
  • Vite
  • Vitest
  • Nuxt
  • UI库文章
  • Docker
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

夜猫子

前端练习生
首页
  • 前端文章

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《Web Api》
    • 《ES6教程》
    • 《Vue》
    • 《React》
    • 《TypeScript》
    • 《Git》
    • 《Uniapp》
    • 小程序笔记
    • 《Electron》
    • JS设计模式总结
  • 《前端架构》

    • 《微前端》
    • 《权限控制》
    • monorepo
  • 全栈项目

    • 任务管理日历
    • 无代码平台
    • 图书管理系统
  • HTML
  • CSS
  • Nodejs
  • Midway
  • Nest
  • MySql
  • 其他
  • 技术文档
  • GitHub技巧
  • 博客搭建
  • Ajax
  • Vite
  • Vitest
  • Nuxt
  • UI库文章
  • Docker
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Node基础

  • 《MySQL》学习笔记

  • Midway

  • Nest

    • 开篇词
    • 学习理由
    • nest概念扫盲
    • 快速掌握 nestcli
    • 5种http数据传输方式
    • IoC 解决了什么痛点问题?
    • 如何调试 Nest 项目
    • Provider注入对象
    • 全局模块和生命周期
    • AOP 架构有什么好处?
    • 一网打尽 Nest 全部装饰器
    • Nest如何自定义装饰器
    • Metadata和Reflector
    • ExecutionContext切换上下文
    • Module和Provider的循环依赖处理
    • 如何创建动态模块
    • Nest和Express,fastify
    • Nest的Middleware
    • RxJS和Interceptor
    • 内置Pipe和自定义Pipe
    • ValidationPipe验证post请求参数
    • 如何自定义 Exception Filter
    • 图解串一串 Nest 核心概念
    • 接口如何实现多版本共存
    • Express如何使用multer实现文件上传
    • Nest使用multer实现文件上传
    • 图书管理系统
    • 大文件分片上传
    • 最完美的 OSS 上传方案
    • Nest里如何打印日志
    • 为什么Node里要用Winston打印日志
    • Nest 集成日志框架 Winston
    • 通过Desktop学Docker也太简单了
    • 你的第一个 Dockerfile
    • Nest 项目如何编写 Dockerfile
    • 提升 Dockerfile 水平的 5 个技巧
    • Docker 是怎么实现的
    • 为什么 Node 应用要用 PM2 来跑?
    • 快速入门 MySQL
    • SQL 查询语句的所有语法和函数
    • 一对一、join 查询、级联方式
    • 一对多、多对多关系的表设计
    • 子查询和 EXISTS
    • SQL 综合练习
    • MySQL 的事务和隔离级别
    • MySQL 的视图、存储过程和函数
    • Node 操作 MySQL 的两种方式
    • 快速掌握 TypeORM
    • TypeORM 一对一的映射和关联 CRUD
    • TypeORM 一对多的映射和关联 CRUD
    • TypeORM 多对多的映射和关联 CRUD
    • 在 Nest 里集成 TypeORM
    • TypeORM保存任意层级的关系
    • 生产环境为什么用TypeORM的migration迁移功能
    • Nest 项目里如何使用 TypeORM 迁移
    • 如何动态读取不同环境的配置?
    • 快速入门 Redis
    • 在 Nest 里操作 Redis
    • 为什么不用 cache-manager 操作 Redis
    • 两种登录状态保存方式:JWT、Session
    • Nest 里实现 Session 和 JWT
    • MySQL + TypeORM + JWT 实现登录注册
    • 基于 ACL 实现权限控制
    • 基于 RBAC 实现权限控制
    • access_token和refresh_token实现无感登录
    • 单token无限续期实现登录无感刷新
    • 使用 passport 做身份认证
    • passport 实现 GitHub 三方账号登录
    • passport 实现 Google 三方账号登录
    • 为什么要使用 Docker Compose ?
    • Docker 容器通信的最简单方式:桥接网络
    • Docker 支持重启策略,是否还需要 PM2
    • 快速掌握 Nginx 的 2 大核心用法
    • 基于 Nginx 实现灰度系统
    • 基于 Redis 实现分布式 session
    • Redis + 高德地图,实现附近的充电宝
    • 用 Swagger 自动生成 api 文档
    • 如何灵活创建 DTO
    • class- validator 的内置装饰器,如何自定义装饰器
    • 序列化 Entity,你不需要 VO 对象
    • 手写序列化 Entity 的拦截器
    • 使用 compodoc 生成文档
    • Node 如何发邮件?
    • 实现基于邮箱验证码的登录
    • 基于 sharp 实现 gif 压缩工具
    • 大文件如何实现流式下载?
    • Puppeteer 实现爬虫,爬取 BOSS 直聘全部前端岗位
    • 实现扫二维码登录
    • Nest 的 REPL 模式
    • 实现 Excel 导入导出
    • 如何用代码动态生成 PPT
    • 如何拿到服务器 CPU、内存、磁盘状态
    • Nest 如何实现国际化?
    • 会议室预订系统:需求分析和原型图
    • 会议室预订系统:技术方案和数据库设计
    • 会议室预订系统:用户管理模块--用户注册
    • 会议室预订系统:用户管理模块--配置抽离、登录认证鉴权
    • 会议室预订系统:用户管理模块-- interceptor、修改信息接口
    • 会议室预订系统:用户管理模块--用户列表和分页查询
    • 会议室预订系统:用户管理模块-- swagger 接口文档
    • 会议室预订系统:用户管理模块-- 用户端登录注册页面
    • 会议室预订系统:用户管理模块-- 用户端信息修改页面
    • 会议室预订系统:用户管理模块-- 头像上传
    • 会议室预订系统:用户管理模块-- 管理端用户列表页面
    • 会议室预订系统:用户管理模块-- 管理端信息修改页面
    • 会议室预订系统:会议室管理模块-后端开发
    • 会议室预订系统:会议室管理模块-管理端前端开发
    • 会议室预订系统:会议室管理模块-用户端前端开发
    • 会议室预订系统:预定管理模块-后端开发
    • 会议室预订系统:预定管理模块-管理端前端开发
    • 会议室预订系统:预定管理模块-用户端前端开发
    • 会议室预订系统:统计管理模块-后端开发
    • 会议室预订系统:统计管理模块-前端开发
    • 会议室预订系统:后端项目部署到阿里云
    • 会议室预订系统:前端项目部署到阿里云
    • 会议室预定系统:用 migration 初始化表和数据
    • 会议室预定系统:文件上传 OSS
    • 会议室预定系统:Google 账号登录后端开发
    • 会议室预定系统:Google 账号登录前端开发
    • 会议室预定系统:后端代码优化
    • 会议室预定系统:集成日志框架 winston
    • 会议室预定系统:前端代码优化
    • 会议室预定系统:全部功能测试
    • 会议室预定系统:项目总结
    • Nest 如何创建微服务?
    • Nest 的 Monorepo 和 Library
    • 用 Etcd 实现微服务配置中心和注册中心
    • Nest 集成 Etcd 做注册中心、配置中心
    • 用 Nacos 实现微服务配置中心和注册中心
    • 基于 gRPC 实现跨语言的微服务通信
    • 快速入门 ORM 框架 Prisma
    • Prisma 的全部命令
    • Prisma 的全部 schema 语法
    • Primsa Client 单表 CRUD 的全部 api
    • Prisma Client 多表 CRUD 的全部 api
    • 在 Nest 里集成 Prisma
    • 为什么前端监控系统要用 RabbitMQ?
      • 总结
    • 基于 Redis 实现关注关系
    • 基于 Redis 实现各种排行榜(周榜、月榜、年榜)
    • 考试系统:需求分析
    • 考试系统:技术方案和数据库设计
    • 考试系统:微服务、Lib 拆分
    • 考试系统;用户注册
    • 考试系统:用户登录、修改密码
    • 考试系统:考试微服务
    • 考试系统:登录、注册页面
    • 考试系统:修改密码、试卷列表页面
    • 考试系统:新增试卷、回收站
    • 考试系统:试卷编辑器
    • 考试系统:试卷回显、预览、保存
    • 考试系统:答卷微服务
    • 考试系统:答题页面
    • 考试系统:自动判卷
    • 考试系统:分析微服务、排行榜页面
    • 考试系统:整体测试
    • 考试系统:项目总结
    • 用 Node.js 手写 WebSocket 协议
    • Nest 开发 WebSocket 服务
    • 基于 Socket.io 的 room 实现群聊
    • 聊天室:需求分析和原型图
    • 聊天室:技术选型和数据库设计
    • 聊天室:用户注册
    • 聊天室:用户登录
    • 聊天室:修改密码、修改信息
    • 聊天室:好友列表、发送好友申请
    • 聊天室:创建聊天室、加入群聊
    • 聊天室:登录、注册页面开发
    • 聊天室:修改密码、信息页面开发
    • 聊天室:头像上传
    • 聊天室:好友∕群聊列表页面
    • 聊天室:添加好友弹窗、通知页面
    • 聊天室:聊天功能后端开发
    • 聊天室:聊天功能前端开发
    • 聊天室:一对一聊天
    • 聊天室:创建群聊、进入群聊
    • 聊天室:发送表情、图片、文件
    • 聊天室:收藏
    • 聊天室:全部功能测试
    • 聊天室:项目总结
    • MongoDB 快速入门
    • 使用 mongoose 操作 MongoDB 数据库
    • GraphQL 快速入门
    • Nest 开发 GraphQL 服务:实现 CRUD
    • GraphQL + Primsa + React 实现 TodoList
    • 如何调试 Nest 源码?
  • 其他

  • 服务端
  • Nest
神说要有光
2025-03-10
目录

为什么前端监控系统要用 RabbitMQ?

前端监控系统是采集用户端的异常、性能、业务埋点等数据上报,在服务端做存储,并支持可视化分析的平台。

用户量可能很大,采集的数据可能比较多,这时候服务端的并发压力会比较大,要是直接存入数据库,那数据库服务很可能会崩掉。

那就用现在的数据库,如何保证面对大量并发请求的时候,服务不崩呢?

答案就是消息队列,比如常用的 RabbitMQ:

第一个 web 服务接收请求,把消息存入 RabbitMQ,然后另一个 web 服务从 MQ 中取出消息存入数据库。

有同学说,这不是一样么?

不一样,MQ 的并发量比数据库高很多。之前 web 服务要等数据库存储完成才能响应,而现在只存入 MQ 就可以响应了。那可以支持的并发量就更多。

而数据库的并发比较低,我们可以通过 MQ 把消费的上限调低,就能保证数据库服务不崩。

比如 10w 的消息进来,每次只从中取出 1000 来消费:

并发量被控制住了,自然就崩不了了,从 MQ 中取出慢慢处理就好了。

这就是 MQ 的流量削峰的功能。

而且完全可以加几个 web 服务来同时消费 MQ 中的消息:

知道了 RabbitMQ 能干啥,那我们就来用一下试试吧!

我们通过 docker 来跑 RabbitMQ。

搜索 rabbitmq 的镜像,选择 3.11-management 的版本:

这个版本是有 web 管理界面的。

点击 run:

映射容器内的 5672、15672 这俩端口到本地的端口。

15672 是管理界面的,5672 是 mq 服务的端口。

等 rabbitmq 跑起来之后:

就可以在浏览器访问 http://localhost:15672 了:

这就是它的 web 管理界面。

输入 guest、guest 进入管理页面:

可以看到 connection、channel、exchange、queue 的分别的管理页面。

这些都是什么呢?

写个 demo 就理解了:

创建个项目:

mkdir rabbitmq-test

cd rabbitmq-test

npm init -y
1
2
3
4
5

安装用到的包:

npm install amqplib
1

创建 src/producer.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertQueue('aaa');
await channel.sendToQueue('aaa',Buffer.from('hello'))
1
2
3
4
5
6
7

安装 amqplib 的包,这个是 rabbitmq 的 node 客户端(amqp 是 rabbitmq 的协议)。

上面的代码连接了 rabbitmq 服务,创建了一个名字为 aaa 的队列,并向队列中发送了一个消息。

然后 node 跑一下:

node ./src/producer.js
1

(这里要用 es module 语法并且支持顶层 await 需要在 packege.json 里设置 type 为 module)

之后就可以在管理界面看到这个队列了:

然后我们再写一个消费端 src/consumer.js:

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue('aaa');
channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9

assertQueue 是如果没有就创建队列,有的话就直接返回。

这里取到那个队列,就可以从中消费消息了:

node src/consumer.js
1

这样,我们就完成了第一次 RabbitMQ 的通信,两个服务之间也是这样通信的。

是不是还挺简单的?

rabbitmq 使用确实挺简单。

那怎么控制并发数呢?

我们改一下 src/producer.js:

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertQueue('aaa', {durable: false});

let i = 1;
setInterval(async () => {
    const msg = 'hello' + i;
    console.log('发送消息:', msg);
    await channel.sendToQueue('aaa',Buffer.from(msg))
    i++;
}, 500);
1
2
3
4
5
6
7
8
9
10
11
12
13
14

生产者每 0.5s 发送一次消息。

消费者每 1s 处理一条消息:

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue('aaa');
channel.prefetch(3);

const currentTask = [];
channel.consume(queue, msg => {
    currentTask.push(msg);
    console.log('收到消息:', msg.content.toString());
}, { noAck: false });

setInterval(() => {
    const curMsg = currentTask.pop();
    channel.ack(curMsg);
}, 1000);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

每条消费者收到的消息要确认之后才会在 MQ 里删除。可以收到消息自动确认,也可以手动确认。

这里我把 noAck 设置为 false 了,也就是不自动确认。

把收到的消息放入一个数组中,每 1s 确认一次。

然后我设置了 prefetch 为 3,也就是每次最多取回 3 条消息来处理。

跑一下试试:

消息生产端:

node ./src/producer.js
1

消息消费端:

node ./src/consumer.js
1

可以看到生产者是每 0.5s 往队列里放一条消息。

消费者一开始取出 3 条,然后每处理完一条取一条,保证最多并发处理 3 条。

这就是流量削峰的功能。

不同服务之间的速度差异可以通过 MQ 来缓冲。

大概了解了 rabbitmq 之后,我们来看看它的整体架构图:

Producer 和 Consumer 分别是生产者和消费者。

Connection 是连接,但我们不会每用一次 rabbitmq 就创建一个单独的 Connection,而是在一个 Connection 里做一下划分,叫做 Channel,每个 Channel 做自己的事情。

而 Queue 就是两端存取消息的地方了。

整个接收消息和转发消息的服务就叫做 Broker。

至于 Exchange,我们前面的例子没有用到,这个是把消息放到不同的队列里用的,叫做交换机。

我们前面生产者和消费者都是直接指定了从哪个队列存取消息,那如果是一对多的场景呢?

总不能一个个的调用 sendQueue 发消息吧?

这时候就要找一个 Exchange(交换机) 来帮我们完成把消息按照规则放入不同的 Queue 的工作了。

Exchange 主要有 4 种:

  • fanout:把消息放到这个交换机的所有 Queue
  • direct:把消息放到交换机的指定 key 的队列
  • topic:把消息放到交换机的指定 key 的队列,支持模糊匹配
  • headers:把消息放到交换机的满足某些 header 的队列

一个个来试下:

首先是 direct,生产者端 src/direct.js:

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange', 'direct');

channel.publish('direct-test-exchange', 'aaa',  Buffer.from('hello1'));
channel.publish('direct-test-exchange', 'bbb',  Buffer.from('hello2'));
channel.publish('direct-test-exchange', 'ccc',  Buffer.from('hello3'));
1
2
3
4
5
6
7
8
9
10

不再是直接 sendToQueue 了,而是创建一个 exchange,然后调用 publish 往这个 exchange 发消息。

其中第二个参数是 routing key,也就是消息路由到哪个队列。

然后创建两个消费者:

src/direct-consumer1.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue('queue1');
await channel.bindQueue(queue,  'direct-test-exchange', 'aaa');

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9
10
11

src/direct-consumer2.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue('queue2');
await channel.bindQueue(queue,  'direct-test-exchange', 'bbb');

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });

1
2
3
4
5
6
7
8
9
10
11
12

分别创建 queue1 和 queue2 两个队列,绑定到前面创建的 direct-test-exchange 这个交换机上,指定了路由 key 分别是 aaa 和 bbb。

然后把生产者和两个消费者跑起来。

node src/direct.js
1
node src/direct-consumer1.js
1
node src/direct-consumer2.js
1

就可以看到队列 queue1 和 queue2 分别接收到了对应的消息:

这就是通过 direct 交换机发送消息的过程。

在管理页面上也可以看到这个交换机的信息:

包括 exchange 下的两个 queue 以及各自的 routing key。

再来试下 topic 类型的 Exchange。

src/topic.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange2', 'topic');

channel.publish('direct-test-exchange2', 'aaa.1',  Buffer.from('hello1'));
channel.publish('direct-test-exchange2', 'aaa.2',  Buffer.from('hello2'));
channel.publish('direct-test-exchange2', 'bbb.1',  Buffer.from('hello3'));
1
2
3
4
5
6
7
8
9
10

生产者端创建叫 direct-test-exchange2 的 topic 类型的 Exchange,然后发三条消息。

创建两个消费端:

src/topic-consumer1.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange2', 'topic');

const { queue } = await channel.assertQueue('queue1');
await channel.bindQueue(queue,  'direct-test-exchange2', 'aaa.*');

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9
10
11
12
13

src/topic-consumer2.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange2', 'topic');

const { queue } = await channel.assertQueue('queue2');
await channel.bindQueue(queue,  'direct-test-exchange2', 'bbb.*');

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9
10
11
12
13

两个消费者端分别创建 queue1 和 queue2 两个队列,绑定到 direct-test-exchange2 的交换机下。

指定路由 key 分别为 aaa.* 和 bbb.*,这里的 * 是模糊匹配的意思。

消费者端也 assertExchange 了,如果不存在就创建,保证 exchange 一定存在。

然后跑一下:

node src/topic.js
1
node src/topic-consumer1.js
1
node src/topic-consumer2.js
1

可以看到,两个消费者分别收到了不同 routing key 对应的消息。

当然,在管理界面这里也是可以发消息的:

消费者端同样可以收到:

这就是 topic 类型的交换机,可以根据模糊匹配 routing key 来发消息到不同队列。

再来试下 fanout 类型的 exchange:

生产者:

src/fanout.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange3', 'fanout');

channel.publish('direct-test-exchange3', '',  Buffer.from('hello1'));
channel.publish('direct-test-exchange3', '',  Buffer.from('hello2'));
channel.publish('direct-test-exchange3', '',  Buffer.from('hello3'));
1
2
3
4
5
6
7
8
9
10

消费者:

src/fanout-consumer1.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange3', 'fanout');

const { queue } = await channel.assertQueue('queue1');
await channel.bindQueue(queue,  'direct-test-exchange3', 'aaa');

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9
10
11
12
13

src/fanout-consumer2.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange3', 'fanout');

const { queue } = await channel.assertQueue('queue2');
await channel.bindQueue(queue,  'direct-test-exchange3', 'bbb');

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9
10
11
12
13

fanout 是广播消息到 Exchange 下的所有队列,不需要指定 routing key,计算指定了也会忽略。

跑起来可以看到,两个消费者都收到了消息:

node src/fanout.js
1
node src/fanout-consumer1.js
1
node src/fanout-consumer2.js
1

这就是 fanout 类型交换机的特点,广播消息到所有绑定到它的 queue。

最后再来看下 headers 类型的 Exchange,这个不是根据 routing key 来匹配了,而是根据 headers:

生产者端:

src/headers.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange4', 'headers');

channel.publish('direct-test-exchange4', '',  Buffer.from('hello1'), {
    headers: {
        name: 'guang'
    }
});
channel.publish('direct-test-exchange4', '',  Buffer.from('hello2'), {
    headers: {
        name: 'guang'
    }
});
channel.publish('direct-test-exchange4', '',  Buffer.from('hello3'), {
    headers: {
        name: 'dong'
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

消费者端:

src/headers-consumer1.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange4', 'headers');

const { queue } = await channel.assertQueue('queue1');
await channel.bindQueue(queue,  'direct-test-exchange4', '', {
    name: 'guang'
});

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

src/headers-consumer2.js

import * as amqp from 'amqplib'

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange('direct-test-exchange4', 'headers');

const { queue } = await channel.assertQueue('queue2');
await channel.bindQueue(queue,  'direct-test-exchange4', '', {
    name: 'dong'
});

channel.consume(queue, msg => {
    console.log(msg.content.toString())
}, { noAck: true });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

跑起来是这样的:

node src/headers.js
1
node src/headers-consumer1.js
1
node src/headers-consumer2.js
1

很容易理解,只是从匹配 routing key 变成了匹配 header。

这就是 Exchange,当你需要一对多发消息的时候,就可以选择这些类型的交换机。

回过头来,我们来总结下 rabbitmq 解决了什么问题:

  • 流量削峰:可以把很大的流量放到 mq 种按照一定的流量上限来慢慢消费,这样虽然慢一点,但不至于崩溃。
  • 应用解耦:应用之间不再直接依赖,就算某个应用挂掉了,也可以再恢复后继续从 mq 中消费消息。并不会一个应用挂掉了,它关联的应用也挂掉。

比如前端监控系统的后端服务,就很适合使用 mq 来做流量削峰。

案例代码在小册仓库 (opens new window)

# 总结

前端监控系统会收到很多来自用户端的请求,如果直接存入数据库很容易把数据库服务搞挂掉,所以一般会加一个 RabbitMQ 来缓冲。

它是生产者往 queue 里放入消息,消费者从里面读消息,之后确认消息收到的流程。

当一对多的时候,还要加一个 Exchange 交换机来根据不同的规则转发消息:

  • direct 交换机:根据 routing key 转发消息到队列
  • topic 交换机:根据 routing key 转发消息到队列,支持模糊匹配
  • headers 交换机:根据 headers 转发消息到队列
  • fanout 交换机:广播消息到交换机下的所有队列

而且消费者可以设置一个消费的并发上限,这样就可以保证服务不会因并发过高而崩溃。

这就是流量削峰的功能。

RabbitMQ 在后端系统中经常能见到,是很常用的中间件。

编辑 (opens new window)
上次更新: 2025/10/27 10:53:52
在 Nest 里集成 Prisma
基于 Redis 实现关注关系

← 在 Nest 里集成 Prisma 基于 Redis 实现关注关系→

最近更新
01
H5调用微信jssdk
09-28
02
VueVirtualScroller
09-19
03
如何调试 Nest 项目
03-10
更多文章>
Copyright © 2019-2025 Study | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式