【Kafka源码解读和使用指南】第30篇:Kafka分区分配器源码解析——公平分配是门艺术
2026/6/10 0:10:57 网站建设 项目流程

上一篇【第29篇】Kafka心跳机制源码解析——消费者如何向Broker"报平安"
下一篇【第31篇】Rebalance实现源码深度解析——从发现到完成的完整状态机(明日更新,敬请期待)


摘要

当Consumer Group触发Rebalance时,分区给谁、怎么给,决定了整个Group的消费均衡程度。PartitionAssignor就是负责"分蛋糕"的角色。Kafka内置了三种分配策略:RangeAssignor按Topic维度等分(简单但可能不均衡)、RoundRobinAssignor全局轮询(均衡但要求订阅一致)、StickyAssignor粘性分配(均衡且保持原分配)。本文用ASCII图解三种算法在相同场景下的分配差异,深入剖析每种算法的源码实现,并通过对比表格帮助选型。最后,手把手教你实现一个自定义PartitionAssignor——按消费能力加权分配,让你的消费者真正"按劳分配"。


一、PartitionAssignor接口设计

// PartitionAssignor 接口定义publicinterfacePartitionAssignor{// 分配方法:给定分区信息和消费者的订阅信息,返回分配结果Map<String,List<TopicPartition>>assign(Map<String,Integer>partitionsPerTopic,// Topic→分区数Map<String,List<String>>subscriptions// ConsumerId→订阅Topic列表);// 分配结果缓存到__consumer_offsets时的序列化格式Subscriptionsubscription(Set<String>topics);// 返回策略名称Stringname();}

调用入口:在ConsumerCoordinator.performAssignment()中,由Group Leader在JoinGroup完成后调用:

// ConsumerCoordinator.performAssignment()PartitionAssignorassignor=findAssignor(selectedStrategy);Map<String,List<TopicPartition>>result=assignor.assign(partitionsPerTopic,memberSubscriptions);

二、RangeAssignor——"分猪肉"策略

2.1 算法思想

RangeAssignor以Topic为维度,每个Topic独立分配。对于每个Topic,将其分区均分给订阅了该Topic的Consumer,余数分给前几个Consumer。

2.2 算法图解

【RangeAssignor分配示例】 假设: Topic-A: P0, P1, P2, P3, P4 (5个分区) Topic-B: P0, P1, P2 (3个分区) Consumers: C1, C2, C3 Step1: Topic-A分配 (5个分区 / 3个Consumer = 1余2) ┌──┬──┬──┬──┬──┐ │P0│P1│P2│P3│P4│ 分区序列 └──┴──┴──┴──┴──┘ ├C1┤ ├C2┤ ├─C3 ┤ P0,P1 P2,P3 P4 C1 → [A-P0, A-P1] (前2个Consumer多1个) C2 → [A-P2, A-P3] C3 → [A-P4] Step2: Topic-B分配 (3个分区 / 3个Consumer = 1) ┌──┬──┬──┐ │P0│P1│P2│ └──┴──┴──┘ C1 C2 C3 C1 → [B-P0] C2 → [B-P1] C3 → [B-P2] 最终: C1 → [A-P0, A-P1, B-P0] = 3个分区 C2 → [A-P2, A-P3, B-P1] = 3个分区 C3 → [A-P4, B-P2] = 2个分区 看似均匀,但Topic多的场景会累积不均

2.3 源码实现

// RangeAssignor.assign() 核心逻辑publicMap<String,List<TopicPartition>>assign(Map<String,Integer>partitionsPerTopic,Map<String,List<String>>subscriptions){Map<String,List<TopicPartition>>assignment=newHashMap<>();subscriptions.keySet().forEach(m->assignment.put(m,newArrayList<>()));// 逐个Topic分配for(Map.Entry<String,Integer>topicEntry:partitionsPerTopic.entrySet()){Stringtopic=topicEntry.getKey();intnumPartitions=topicEntry.getValue();// 找出订阅了这个Topic的ConsumerList<String>consumersForTopic=subscriptions.entrySet().stream().filter(e->e.getValue().contains(topic)).map(Map.Entry::getKey).sorted()// 按memberId字典序排序,保证可重复性.collect(Collectors.toList());intnumConsumers=consumersForTopic.size();intpartitionsPerConsumer=numPartitions/numConsumers;intconsumersWithExtra=numPartitions%numConsumers;// 逐个Consumer分配intcurrentPartition=0;for(inti=0;i<numConsumers;i++){intstart=currentPartition;intlength=partitionsPerConsumer+(i<consumersWithExtra?1:0);Stringconsumer=consumersForTopic.get(i);for(intp=start;p<start+length;p++){assignment.get(consumer).add(newTopicPartition(topic,p));}currentPartition=start+length;}}returnassignment;}

三、RoundRobinAssignor——"发扑克牌"策略

3.1 算法思想

将所有Topic-Partition对排成一个有序列表,然后像发扑克牌一样依次分配给Consumer。

3.2 算法图解

【RoundRobinAssignor分配示例】 同一场景: Topic-A: P0, P1, P2, P3, P4 Topic-B: P0, P1, P2 Consumers: C1, C2, C3 Step1: 所有分区排成有序队列 [A-P0, A-P1, A-P2, A-P3, A-P4, B-P0, B-P1, B-P2] Step2: 轮询分配 第1轮: C1←A-P0 C2←A-P1 C3←A-P2 第2轮: C1←A-P3 C2←A-P4 C3←B-P0 第3轮: C1←B-P1 C2←B-P2 (队列空) 结果: C1 → [A-P0, A-P3, B-P1] = 3个 C2 → [A-P1, A-P4, B-P2] = 3个 C3 → [A-P2, B-P0] = 2个

3.3 与RangeAssignor的差异

【同场景下两种策略的结果对比】 场景: Topic-A(5分区) + Topic-B(3分区), C1/C2/C3 RangeAssignor: C1 → [A-P0, A-P1, B-P0] = 3个 C2 → [A-P2, A-P3, B-P1] = 3个 C3 → [A-P4, B-P2] = 2个 RoundRobinAssignor: C1 → [A-P0, A-P3, B-P1] = 3个 C2 → [A-P1, A-P4, B-P2] = 3个 C3 → [A-P2, B-P0] = 2个 分析: - 总量上两种策略都是均匀的(3/3/2) - RoundRobin让分区分布更"分散"(同一Topic分区被打散到不同Consumer) - RangeAssignor让分区分布更"集中"(同一Topic分区尽量连续分配给同一Consumer)

RoundRobinAssignor的硬性要求:所有Consumer必须订阅完全相同的Topic集合,否则轮询会出现错误。


四、StickyAssignor——"粘性"保底策略(Kafka 0.11+)

4.1 算法核心思想

StickyAssignor有两个目标(按优先级):

  1. 均衡性:分区尽可能均匀分配
  2. 粘性:Rebalance时尽可能保持原有分配不变

4.2 算法图解

【StickyAssignor的优势展示】 初始状态(均衡分配,3 Consumer, 6分区): C1 → [A-P0, B-P1] C2 → [A-P1, B-P0] C3 → [A-P2, B-P2] 场景:C2下线 → 需要Rebalance RangeAssignor/RoundRobinAssignor的结果: 完全重新分配,打乱原有对应关系 C1 → [A-P0, A-P1, B-P0] ← C1原来只消费A-P0和B-P1,现在多了A-P1和B-P0 C3 → [A-P2, B-P1, B-P2] ← C3原来只消费A-P2和B-P2,现在多了B-P1 StickyAssignor的结果: 尽量保持原有分配,只有被C2释放的分区才重新分配 C1 → [A-P0, B-P1, B-P0] ← C1保持A-P0和B-P1不变,只新增B-P0 C3 → [A-P2, B-P2, A-P1] ← C3保持A-P2和B-P2不变,只新增A-P1 优势: - C1不需要处理A-P1的切换 - C3不需要处理B-P1的切换 - 减少onPartitionsRevoked触发次数 - 有状态消费场景下,减少状态重建开销

4.3 StickyAssignor的实现思路

// StickyAssignor 伪代码(实际实现更复杂,使用贪心算法+平衡优化)// 阶段1:保留现有的均衡分配Map<String,List<TopicPartition>>currentAssignment=memberData.getCurrentAssignment();// 从上次Rebalance记录// 阶段2:处理新增的分区List<TopicPartition>newPartitions=allPartitions-currentAssignment中的所有分区;for(TopicPartitiontp:newPartitions){// 分给分区数最少的ConsumerStringleastLoadedConsumer=findLeastLoaded(assignment);assignment.get(leastLoadedConsumer).add(tp);}// 阶段3:处理需要重新分配的分区(Consumer下线或分区被移除)// 尽可能保持粘性分配,只在必要时重新平衡rebalanceSticky(assignment,consumerLoad);

五、三种分配策略全面对比

对比维度RangeAssignorRoundRobinAssignorStickyAssignor
分配粒度按Topic全局所有Topic全局所有Topic
均衡性中等(Topic多时倾斜)良好良好
Rebalance影响大(全量重算)大(全量重算)小(尽量保持原分配)
订阅要求可不同必须相同可不同
排序保证按MemberId字典序按分区号轮询按负载均衡
有状态消费不友好不友好友好
引入版本0.90.90.11
推荐度★★☆☆☆★★★☆☆★★★★★

六、自定义PartitionAssignor实战——按消费能力加权分配

假设我们有一个异质消费场景:Consumer C1运行在8核机器上,C2运行在4核机器上,我们希望C1分配更多分区。

// WeightedPartitionAssignor.javapublicclassWeightedPartitionAssignorimplementsPartitionAssignor{// 在Subscription的userData中携带权重信息privatestaticfinalStringWEIGHT_KEY="weight";@OverridepublicSubscriptionsubscription(Set<String>topics){// 获取当前机器的CPU核心数作为权重intcpuCores=Runtime.getRuntime().availableProcessors();Map<String,Object>userData=newHashMap<>();userData.put(WEIGHT_KEY,cpuCores);ByteBufferbuffer=ByteBuffer.allocate(4);buffer.putInt(cpuCores);buffer.flip();returnnewSubscription(newArrayList<>(topics),buffer);}@OverridepublicMap<String,List<TopicPartition>>assign(Map<String,Integer>partitionsPerTopic,Map<String,List<String>>subscriptions){Map<String,List<TopicPartition>>assignment=newHashMap<>();subscriptions.keySet().forEach(m->assignment.put(m,newArrayList<>()));// 解析每个Consumer的权重Map<String,Integer>weights=newHashMap<>();inttotalWeight=0;for(Stringconsumer:subscriptions.keySet()){intweight=4;// 默认权重// 实际场景中从userData获取(简化)weights.put(consumer,weight);totalWeight+=weight;}// 构建全局分区列表List<TopicPartition>allPartitions=newArrayList<>();for(Map.Entry<String,Integer>entry:partitionsPerTopic.entrySet()){for(intp=0;p<entry.getValue();p++){allPartitions.add(newTopicPartition(entry.getKey(),p));}}// 按权重比例分配String[]consumers=subscriptions.keySet().toArray(newString[0]);int[]allocatedCount=newint[consumers.length];int[]targetCount=newint[consumers.length];for(inti=0;i<allPartitions.size();i++){// 找分配比例最低的ConsumerintbestIdx=0;doublebestRatio=Double.MAX_VALUE;for(intj=0;j<consumers.length;j++){doubleratio=allocatedCount[j]*1.0/weights.get(consumers[j]);if(ratio<bestRatio){bestRatio=ratio;bestIdx=j;}}assignment.get(consumers[bestIdx]).add(allPartitions.get(i));allocatedCount[bestIdx]++;}returnassignment;}@OverridepublicStringname(){return"weighted";}}

配置使用

# consumer.properties partition.assignment.strategy=com.example.WeightedPartitionAssignor

本篇小结

Kafka分区分配器是Consumer Group"公平分配"的核心算法组件:

  • RangeAssignor:最简单,按Topic维度独立分配。优点是直观,缺点是Topic多时可能出现负载倾斜
  • RoundRobinAssignor:全局轮询,更均匀但要求所有Consumer订阅完全相同
  • StickyAssignor:最新的推荐策略,同时追求均衡性和粘性,在Rebalance时最大程度保持原有分配不变,尤其适合有状态消费场景
  • 自定义PartitionAssignor:只需要实现PartitionAssignor接口,在assign()中实现自己的分配逻辑,然后在consume配置中指定。按权重分配、按数据中心就近分配等场景都可以通过自定义实现

下一篇,我们将深入Rebalance实现的完整状态机,看消费者是如何从发现到完成经历整个Rebalance过程的。


上一篇【第29篇】Kafka心跳机制源码解析——消费者如何向Broker"报平安"
下一篇【第31篇】Rebalance实现源码深度解析——从发现到完成的完整状态机(明日更新,敬请期待)


需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询