主题
02 - 消息队列
MQ 在架构中的三大作用
┌──────────────────────────────────────────────────────────────┐
│ │
│ 1. 异步解耦 │
│ │
│ 同步: 订单服务 → 库存 → 支付 → 通知 → 积分 (串行,很慢) │
│ ──[50ms]──[30ms]──[200ms]──[100ms]── 总计 380ms │
│ │
│ 异步: 订单服务 → MQ ← 库存服务 │
│ ← 支付服务 │
│ ← 通知服务 (并行,只要发MQ的时间) │
│ ──[50ms]──[5ms] 总计 55ms │
│ │
│ ┌──────┐ ┌─────┐ ┌──────┐ │
│ │订单 │────▶│ MQ │────▶│库存 │ │
│ │服务 │ │ │────▶│支付 │ 各服务独立消费 │
│ └──────┘ │ │────▶│通知 │ 互不影响 │
│ └─────┘ └──────┘ │
│ │
│ 2. 削峰填谷 │
│ │
│ 请求量 ╱╲ │
│ ╱ ╲ 没有MQ: DB被峰值打爆 │
│ ╱ ╲ │
│ ──────╱──────╲────────── 时间 │
│ │
│ 请求量 ╱╲ │
│ ╱ ╲ 有MQ: 消息堆积在队列 │
│ ───────╱────╲── ← 消费者按自己的速度处理 │
│ ╱ ╲ DB 压力平稳 │
│ ─────╱────────╲──────── 时间 │
│ │
│ 3. 最终一致性 │
│ │
│ 分布式事务太复杂 → 用 MQ 保证最终一致 │
│ 订单创建 → 发消息 → 库存服务消费 → 最终状态一致 │
│ │
└──────────────────────────────────────────────────────────────┘1. Kafka 架构详解
┌───────────────────────────────────────────────────────────────┐
│ Kafka 架构 │
│ │
│ Producer Kafka Cluster Consumer Group │
│ ┌──────┐ ┌────────────────────┐ ┌──────────┐ │
│ │生产者 │────▶│ Broker 0 │───────▶│Consumer 0│ │
│ │ │ │ ┌──────────────┐ │ └──────────┘ │
│ └──────┘ │ │ Topic: order │ │ ┌──────────┐ │
│ ┌──────┐ │ │ Partition 0 ─┼──┼───────▶│Consumer 1│ │
│ │生产者 │────▶│ │ Partition 1 ─┼──┼────┐ └──────────┘ │
│ │ │ │ └──────────────┘ │ │ ┌──────────┐ │
│ └──────┘ └────────────────────┘ └──▶│Consumer 2│ │
│ ┌────────────────────┐ └──────────┘ │
│ │ Broker 1 │ │
│ │ ┌──────────────┐ │ │
│ │ │ Partition 2 │ │ 每个 Partition │
│ │ │ (Replica) │ │ 只被一个 Consumer 消费 │
│ │ └──────────────┘ │ │
│ └────────────────────┘ │
│ │
└───────────────────────────────────────────────────────────────┘
核心概念:
┌─────────────┬──────────────────────────────────────┐
│ Topic │ 消息的逻辑分类(如 order、payment) │
│ Partition │ Topic 的物理分片,支持并行消费 │
│ Broker │ Kafka 服务器节点 │
│ Producer │ 消息生产者 │
│ Consumer │ 消息消费者 │
│ ConsumerGroup│ 消费者组,组内分摊消费 │
│ Offset │ 消费位置(每个Partition独立) │
│ Replica │ 副本,保证数据可靠性 │
└─────────────┴──────────────────────────────────────┘Partition 与消费者的关系
Topic: order (3 个 Partition)
场景1: 3个消费者(最佳)
┌──────────┐ ┌────────────┐
│ Part-0 │────▶│ Consumer-0 │
│ Part-1 │────▶│ Consumer-1 │ 每人负责一个分区
│ Part-2 │────▶│ Consumer-2 │
└──────────┘ └────────────┘
场景2: 2个消费者
┌──────────┐ ┌────────────┐
│ Part-0 │────▶│ Consumer-0 │ 负责 2 个分区
│ Part-1 │──┘ │ │
│ Part-2 │────▶│ Consumer-1 │ 负责 1 个分区
└──────────┘ └────────────┘
场景3: 4个消费者
┌──────────┐ ┌────────────┐
│ Part-0 │────▶│ Consumer-0 │
│ Part-1 │────▶│ Consumer-1 │
│ Part-2 │────▶│ Consumer-2 │
│ │ │ Consumer-3 │ 空闲!(浪费)
└──────────┘ └────────────┘
⚠️ 消费者数 > 分区数 → 多出来的消费者空闲
所以分区数决定了最大并行度Go 生产者/消费者示例
go
package main
import (
"context"
"fmt"
"time"
)
// 用 channel 模拟 Kafka 的消息队列
type Message struct {
Topic string
Key string
Value string
Timestamp time.Time
}
type SimpleQueue struct {
ch chan Message
}
func NewQueue(bufferSize int) *SimpleQueue {
return &SimpleQueue{ch: make(chan Message, bufferSize)}
}
// 生产者
func (q *SimpleQueue) Produce(ctx context.Context, msg Message) error {
select {
case q.ch <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// 消费者
func (q *SimpleQueue) Consume(ctx context.Context, handler func(Message)) {
for {
select {
case msg := <-q.ch:
handler(msg)
case <-ctx.Done():
fmt.Println("消费者退出")
return
}
}
}
func main() {
queue := NewQueue(100)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// 启动 3 个消费者
for i := 0; i < 3; i++ {
id := i
go queue.Consume(ctx, func(msg Message) {
fmt.Printf("Consumer-%d: %s = %s\n", id, msg.Key, msg.Value)
})
}
// 生产消息
for i := 0; i < 10; i++ {
queue.Produce(ctx, Message{
Topic: "order",
Key: fmt.Sprintf("order-%d", i),
Value: fmt.Sprintf("amount=%d", (i+1)*100),
})
}
<-ctx.Done()
}2. MQ 常见问题
┌────────────────────────────────────────────────────────────┐
│ 问题 │ 解决方案 │
├────────────────────┼───────────────────────────────────────┤
│ │ │
│ 消息丢失 │ 生产者: 同步发送 + ack │
│ │ Broker: 多副本 + 刷盘策略 │
│ │ 消费者: 手动 commit offset │
│ │ │
│ 消息重复 │ 消费端幂等: │
│ │ ① 数据库唯一索引 │
│ │ ② Redis 去重 (SETNX) │
│ │ ③ 业务状态机(已处理就跳过) │
│ │ │
│ 消息顺序 │ 同一个 key 发到同一个 Partition │
│ │ 单 Partition 内有序 │
│ │ 全局有序 → 只用 1 个 Partition(牺牲性能)│
│ │ │
│ 消息堆积 │ ① 增加消费者数量 │
│ │ ② 消费者内部多线程处理 │
│ │ ③ 紧急时跳过处理 + 事后补偿 │
│ │ │
└────────────────────┴───────────────────────────────────────┘下一节: 03 - 限流熔断