跳转至

消费语义与位移管理


1. 三种消费语义

语义 含义 实现方式 风险
At Most Once(最多一次) 消息可能丢失,不会重复 拉取消息后立即提交 offset,再处理 处理失败则消息丢失
At Least Once(至少一次) 消息不会丢失,可能重复 处理成功后再提交 offset 处理成功但提交失败则重复消费
Exactly Once(恰好一次) 不丢不重 幂等生产者 + 事务 + read_committed 实现复杂,性能有损耗
flowchart TD
    A[拉取消息] --> B{何时提交 offset?}
    B -->|拉取后立即提交| C["At Most Once\n(可能丢消息)"]
    B -->|处理成功后提交| D["At Least Once\n(可能重复)"]
    D --> E{消费端是否幂等?}
    E -->|是| F["业务上的 Exactly Once\n(最常用方案)"]
    E -->|否| G["需要 Kafka 事务\n(真正的 Exactly Once)"]

生产实践:绝大多数业务采用 At Least Once + 消费端幂等 的方案,实现简单,性能好。真正的 Kafka 事务(Exactly Once)主要用于 Kafka Streams 等流处理场景。


2. offset 存储机制

2.1 __consumer_offsets 内部 Topic

Kafka 0.9 之后,消费者 offset 不再存储在 ZooKeeper,而是存储在 Kafka 内部 Topic __consumer_offsets 中:

__consumer_offsets 的结构:
- 默认 50 个分区(由 offsets.topic.num.partitions 控制)
- 每条消息的 Key:消费者组ID + Topic + 分区号
- 每条消息的 Value:offset + 元数据(提交时间、过期时间等)

确定分区的公式:
partition = Math.abs(groupId.hashCode()) % 50

为什么从 ZooKeeper 迁移到 Kafka 自身?

对比 ZooKeeper 存储 __consumer_offsets
写入吞吐 低(ZooKeeper 写入有限) 高(Kafka 顺序写)
扩展性
运维 需要 ZooKeeper 无额外依赖

2.2 offset 提交方式

自动提交(Auto Commit)

// 配置
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // 每5秒自动提交

// 消费代码
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record); // 处理消息
    }
    // 每5秒自动提交 offset(后台线程)
}

问题:自动提交是 At Least Once 语义,但如果 poll 后还没处理完就自动提交了,则变成 At Most Once。

手动提交(Manual Commit)

props.put("enable.auto.commit", "false"); // 关闭自动提交

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }

    // 方式一:同步提交(阻塞直到提交成功,可靠但慢)
    consumer.commitSync();

    // 方式二:异步提交(不阻塞,性能好,但失败不重试)
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("提交 offset 失败: {}", offsets, exception);
        }
    });

    // 方式三:提交指定 offset(精细控制)
    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
    for (ConsumerRecord<String, String> record : records) {
        offsetMap.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1) // 注意:提交的是下一条消息的 offset
        );
    }
    consumer.commitSync(offsetMap);
}

生产推荐:使用异步提交 + 关闭时同步提交的组合:

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        process(records);
        consumer.commitAsync(); // 正常情况异步提交(性能好)
    }
} catch (Exception e) {
    log.error("消费异常", e);
} finally {
    try {
        consumer.commitSync(); // 关闭前同步提交,确保最后一批 offset 不丢失
    } finally {
        consumer.close();
    }
}

3. Group Coordinator(组协调器)

3.1 什么是 Group Coordinator?

每个消费者组都有一个 Group Coordinator,它是 Broker 上的一个组件,负责:

  • 管理消费者组的成员关系(加入、离开、心跳)
  • 触发和协调 Rebalance
  • 存储消费者组的 offset(写入 __consumer_offsets

确定 Group Coordinator 的方式

1. 计算 __consumer_offsets 的分区号:
   partition = Math.abs(groupId.hashCode()) % 50

2. 该分区的 Leader 所在的 Broker,就是这个消费者组的 Group Coordinator

3.2 消费者加入流程

sequenceDiagram
    participant C1 as 消费者1
    participant C2 as 消费者2
    participant GC as Group Coordinator

    C1->>GC: FindCoordinator(找到 Group Coordinator)
    C2->>GC: FindCoordinator
    C1->>GC: JoinGroup(申请加入,上报支持的分配策略)
    C2->>GC: JoinGroup
    GC->>GC: 等待所有成员 JoinGroup(session.timeout.ms 内)
    GC-->>C1: JoinGroup Response(C1 被选为 Leader Consumer)
    GC-->>C2: JoinGroup Response(Follower)
    Note over C1: Leader Consumer 负责执行分区分配
    C1->>C1: 执行分区分配算法(RangeAssignor/RoundRobinAssignor 等)
    C1->>GC: SyncGroup(上报分配结果)
    C2->>GC: SyncGroup
    GC-->>C1: SyncGroup Response(告知 C1 负责哪些分区)
    GC-->>C2: SyncGroup Response(告知 C2 负责哪些分区)
    Note over C1,C2: 开始消费各自负责的分区

注意:这里的 Leader Consumer 是消费者组内负责执行分区分配的消费者,与 Partition Leader 是完全不同的概念。


4. 分区分配策略

消费者组内的分区分配由 Leader Consumer 执行,支持多种策略:

4.1 RangeAssignor(范围分配,默认)

场景:2个消费者,Topic A 有 3 个分区,Topic B 有 3 个分区

分配结果:
消费者1:Topic A 分区0、1,Topic B 分区0、1
消费者2:Topic A 分区2,Topic B 分区2

问题:消费者1 比消费者2 多消费 2 个分区,不均衡

4.2 RoundRobinAssignor(轮询分配)

场景:2个消费者,Topic A 有 3 个分区,Topic B 有 3 个分区

所有分区排序:A-0, A-1, A-2, B-0, B-1, B-2
轮询分配:
消费者1:A-0, A-2, B-1
消费者2:A-1, B-0, B-2

优点:分配更均衡
缺点:要求所有消费者订阅相同的 Topic

4.3 StickyAssignor(粘性分配,推荐)

目标:在均衡分配的基础上,尽量保持原有分配不变(减少 Rebalance 时的分区迁移)

Rebalance 前:
消费者1:A-0, A-1, B-0
消费者2:A-2, B-1, B-2

消费者2 离开后,Rebalance:
StickyAssignor:消费者1 保留 A-0, A-1, B-0,新增 A-2(只迁移必要的分区)
RoundRobinAssignor:重新轮询,可能打乱所有分配

优点:减少不必要的分区迁移,降低 Rebalance 影响
// 配置分配策略
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.StickyAssignor");

5. subscribe vs assign

消费者有两种订阅方式:

方式 方法 特点 适用场景
动态订阅 subscribe(topics) 加入消费者组,自动分配分区,支持 Rebalance 普通业务消费
手动分配 assign(partitions) 不加入消费者组,手动指定分区,不触发 Rebalance 精确控制、数据迁移、回放历史消息
// 方式一:subscribe(加入消费者组)
consumer.subscribe(Arrays.asList("orders", "payments"));

// 方式二:assign(手动指定分区,不加入消费者组)
TopicPartition partition0 = new TopicPartition("orders", 0);
TopicPartition partition1 = new TopicPartition("orders", 1);
consumer.assign(Arrays.asList(partition0, partition1));

// assign 后可以手动指定从哪个 offset 开始消费(用于回放)
consumer.seek(partition0, 1000); // 从 offset=1000 开始消费

6. 消费者 offset 重置策略

当消费者组第一次消费某个 Topic,或者 offset 已过期(超过 offsets.retention.minutes,默认 7 天),需要决定从哪里开始消费:

// auto.offset.reset 配置
props.put("auto.offset.reset", "earliest"); // 从最早的消息开始(常用于数据回放)
props.put("auto.offset.reset", "latest");   // 从最新的消息开始(默认,只消费新消息)
props.put("auto.offset.reset", "none");     // 没有 offset 时抛出异常

手动重置 offset(命令行)

# 重置消费者组 my-group 对 Topic orders 的 offset 到最早
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --topic orders \
  --reset-offsets --to-earliest \
  --execute

# 重置到指定时间点(用于故障恢复)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --topic orders \
  --reset-offsets --to-datetime 2024-01-01T00:00:00.000 \
  --execute

7. 消费者 Lag 监控

Consumer Lag(消费延迟)= 分区最新 offset(LEO)- 消费者已提交 offset

# 查看消费者组的 Lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --describe

# 输出示例:
# GROUP     TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# my-group  orders  0          1000            1050            50   ← Lag=50
# my-group  orders  1          2000            2000            0

Lag 告警建议: - Lag 持续增长 → 消费速度跟不上生产速度,需要扩容消费者或优化处理逻辑 - Lag 突然变大 → 可能是消费者宕机或处理异常,需要排查


8. 常见问题

Q:自动提交和手动提交怎么选?

生产环境推荐手动提交。自动提交无法精确控制 offset 提交时机,可能导致消息丢失(处理前就提交)或重复消费(提交失败)。手动提交虽然代码稍复杂,但能精确控制消费语义。

Q:消费者 offset 提交失败了怎么办?

offset 提交失败不会导致消息丢失,只会导致下次重启后重复消费(At Least Once 语义)。因此消费端必须实现幂等处理,确保重复消费不会产生副作用。

Q:如何实现消费端幂等?

常见方案: 1. 数据库唯一键:将消息 ID 作为唯一键插入数据库,重复消息会触发唯一键冲突 2. Redis 去重:用 SET NX 记录已处理的消息 ID,有效期设为消息可能重复的时间窗口 3. 业务天然幂等:如更新操作(UPDATE SET status=1 WHERE id=xxx),重复执行结果相同