上一篇【第29篇】Kafka心跳机制源码解析——消费者如何向Broker"报平安"
下一篇【第31篇】Rebalance实现源码深度解析——从发现到完成的完整状态机(明日更新,敬请期待)
摘要
当Consumer Group触发Rebalance时,分区给谁、怎么给,决定了整个Group的消费均衡程度。PartitionAssignor就是负责"分蛋糕"的角色。Kafka内置了三种分配策略:RangeAssignor按Topic维度等分(简单但可能不均衡)、RoundRobinAssignor全局轮询(均衡但要求订阅一致)、StickyAssignor粘性分配(均衡且保持原分配)。本文用ASCII图解三种算法在相同场景下的分配差异,深入剖析每种算法的源码实现,并通过对比表格帮助选型。最后,手把手教你实现一个自定义PartitionAssignor——按消费能力加权分配,让你的消费者真正"按劳分配"。
一、PartitionAssignor接口设计
// PartitionAssignor 接口定义publicinterfacePartitionAssignor{// 分配方法:给定分区信息和消费者的订阅信息,返回分配结果Map<String,List<TopicPartition>>assign(Map<String,Integer>partitionsPerTopic,// Topic→分区数Map<String,List<String>>subscriptions// ConsumerId→订阅Topic列表);// 分配结果缓存到__consumer_offsets时的序列化格式Subscriptionsubscription(Set<String>topics);// 返回策略名称Stringname();}调用入口:在ConsumerCoordinator.performAssignment()中,由Group Leader在JoinGroup完成后调用:
// ConsumerCoordinator.performAssignment()PartitionAssignorassignor=findAssignor(selectedStrategy);Map<String,List<TopicPartition>>result=assignor.assign(partitionsPerTopic,memberSubscriptions);二、RangeAssignor——"分猪肉"策略
2.1 算法思想
RangeAssignor以Topic为维度,每个Topic独立分配。对于每个Topic,将其分区均分给订阅了该Topic的Consumer,余数分给前几个Consumer。
2.2 算法图解
【RangeAssignor分配示例】 假设: Topic-A: P0, P1, P2, P3, P4 (5个分区) Topic-B: P0, P1, P2 (3个分区) Consumers: C1, C2, C3 Step1: Topic-A分配 (5个分区 / 3个Consumer = 1余2) ┌──┬──┬──┬──┬──┐ │P0│P1│P2│P3│P4│ 分区序列 └──┴──┴──┴──┴──┘ ├C1┤ ├C2┤ ├─C3 ┤ P0,P1 P2,P3 P4 C1 → [A-P0, A-P1] (前2个Consumer多1个) C2 → [A-P2, A-P3] C3 → [A-P4] Step2: Topic-B分配 (3个分区 / 3个Consumer = 1) ┌──┬──┬──┐ │P0│P1│P2│ └──┴──┴──┘ C1 C2 C3 C1 → [B-P0] C2 → [B-P1] C3 → [B-P2] 最终: C1 → [A-P0, A-P1, B-P0] = 3个分区 C2 → [A-P2, A-P3, B-P1] = 3个分区 C3 → [A-P4, B-P2] = 2个分区 看似均匀,但Topic多的场景会累积不均2.3 源码实现
// RangeAssignor.assign() 核心逻辑publicMap<String,List<TopicPartition>>assign(Map<String,Integer>partitionsPerTopic,Map<String,List<String>>subscriptions){Map<String,List<TopicPartition>>assignment=newHashMap<>();subscriptions.keySet().forEach(m->assignment.put(m,newArrayList<>()));// 逐个Topic分配for(Map.Entry<String,Integer>topicEntry:partitionsPerTopic.entrySet()){Stringtopic=topicEntry.getKey();intnumPartitions=topicEntry.getValue();// 找出订阅了这个Topic的ConsumerList<String>consumersForTopic=subscriptions.entrySet().stream().filter(e->e.getValue().contains(topic)).map(Map.Entry::getKey).sorted()// 按memberId字典序排序,保证可重复性.collect(Collectors.toList());intnumConsumers=consumersForTopic.size();intpartitionsPerConsumer=numPartitions/numConsumers;intconsumersWithExtra=numPartitions%numConsumers;// 逐个Consumer分配intcurrentPartition=0;for(inti=0;i<numConsumers;i++){intstart=currentPartition;intlength=partitionsPerConsumer+(i<consumersWithExtra?1:0);Stringconsumer=consumersForTopic.get(i);for(intp=start;p<start+length;p++){assignment.get(consumer).add(newTopicPartition(topic,p));}currentPartition=start+length;}}returnassignment;}三、RoundRobinAssignor——"发扑克牌"策略
3.1 算法思想
将所有Topic-Partition对排成一个有序列表,然后像发扑克牌一样依次分配给Consumer。
3.2 算法图解
【RoundRobinAssignor分配示例】 同一场景: Topic-A: P0, P1, P2, P3, P4 Topic-B: P0, P1, P2 Consumers: C1, C2, C3 Step1: 所有分区排成有序队列 [A-P0, A-P1, A-P2, A-P3, A-P4, B-P0, B-P1, B-P2] Step2: 轮询分配 第1轮: C1←A-P0 C2←A-P1 C3←A-P2 第2轮: C1←A-P3 C2←A-P4 C3←B-P0 第3轮: C1←B-P1 C2←B-P2 (队列空) 结果: C1 → [A-P0, A-P3, B-P1] = 3个 C2 → [A-P1, A-P4, B-P2] = 3个 C3 → [A-P2, B-P0] = 2个3.3 与RangeAssignor的差异
【同场景下两种策略的结果对比】 场景: Topic-A(5分区) + Topic-B(3分区), C1/C2/C3 RangeAssignor: C1 → [A-P0, A-P1, B-P0] = 3个 C2 → [A-P2, A-P3, B-P1] = 3个 C3 → [A-P4, B-P2] = 2个 RoundRobinAssignor: C1 → [A-P0, A-P3, B-P1] = 3个 C2 → [A-P1, A-P4, B-P2] = 3个 C3 → [A-P2, B-P0] = 2个 分析: - 总量上两种策略都是均匀的(3/3/2) - RoundRobin让分区分布更"分散"(同一Topic分区被打散到不同Consumer) - RangeAssignor让分区分布更"集中"(同一Topic分区尽量连续分配给同一Consumer)RoundRobinAssignor的硬性要求:所有Consumer必须订阅完全相同的Topic集合,否则轮询会出现错误。
四、StickyAssignor——"粘性"保底策略(Kafka 0.11+)
4.1 算法核心思想
StickyAssignor有两个目标(按优先级):
- 均衡性:分区尽可能均匀分配
- 粘性:Rebalance时尽可能保持原有分配不变
4.2 算法图解
【StickyAssignor的优势展示】 初始状态(均衡分配,3 Consumer, 6分区): C1 → [A-P0, B-P1] C2 → [A-P1, B-P0] C3 → [A-P2, B-P2] 场景:C2下线 → 需要Rebalance RangeAssignor/RoundRobinAssignor的结果: 完全重新分配,打乱原有对应关系 C1 → [A-P0, A-P1, B-P0] ← C1原来只消费A-P0和B-P1,现在多了A-P1和B-P0 C3 → [A-P2, B-P1, B-P2] ← C3原来只消费A-P2和B-P2,现在多了B-P1 StickyAssignor的结果: 尽量保持原有分配,只有被C2释放的分区才重新分配 C1 → [A-P0, B-P1, B-P0] ← C1保持A-P0和B-P1不变,只新增B-P0 C3 → [A-P2, B-P2, A-P1] ← C3保持A-P2和B-P2不变,只新增A-P1 优势: - C1不需要处理A-P1的切换 - C3不需要处理B-P1的切换 - 减少onPartitionsRevoked触发次数 - 有状态消费场景下,减少状态重建开销4.3 StickyAssignor的实现思路
// StickyAssignor 伪代码(实际实现更复杂,使用贪心算法+平衡优化)// 阶段1:保留现有的均衡分配Map<String,List<TopicPartition>>currentAssignment=memberData.getCurrentAssignment();// 从上次Rebalance记录// 阶段2:处理新增的分区List<TopicPartition>newPartitions=allPartitions-currentAssignment中的所有分区;for(TopicPartitiontp:newPartitions){// 分给分区数最少的ConsumerStringleastLoadedConsumer=findLeastLoaded(assignment);assignment.get(leastLoadedConsumer).add(tp);}// 阶段3:处理需要重新分配的分区(Consumer下线或分区被移除)// 尽可能保持粘性分配,只在必要时重新平衡rebalanceSticky(assignment,consumerLoad);五、三种分配策略全面对比
| 对比维度 | RangeAssignor | RoundRobinAssignor | StickyAssignor |
|---|---|---|---|
| 分配粒度 | 按Topic | 全局所有Topic | 全局所有Topic |
| 均衡性 | 中等(Topic多时倾斜) | 良好 | 良好 |
| Rebalance影响 | 大(全量重算) | 大(全量重算) | 小(尽量保持原分配) |
| 订阅要求 | 可不同 | 必须相同 | 可不同 |
| 排序保证 | 按MemberId字典序 | 按分区号轮询 | 按负载均衡 |
| 有状态消费 | 不友好 | 不友好 | 友好 |
| 引入版本 | 0.9 | 0.9 | 0.11 |
| 推荐度 | ★★☆☆☆ | ★★★☆☆ | ★★★★★ |
六、自定义PartitionAssignor实战——按消费能力加权分配
假设我们有一个异质消费场景:Consumer C1运行在8核机器上,C2运行在4核机器上,我们希望C1分配更多分区。
// WeightedPartitionAssignor.javapublicclassWeightedPartitionAssignorimplementsPartitionAssignor{// 在Subscription的userData中携带权重信息privatestaticfinalStringWEIGHT_KEY="weight";@OverridepublicSubscriptionsubscription(Set<String>topics){// 获取当前机器的CPU核心数作为权重intcpuCores=Runtime.getRuntime().availableProcessors();Map<String,Object>userData=newHashMap<>();userData.put(WEIGHT_KEY,cpuCores);ByteBufferbuffer=ByteBuffer.allocate(4);buffer.putInt(cpuCores);buffer.flip();returnnewSubscription(newArrayList<>(topics),buffer);}@OverridepublicMap<String,List<TopicPartition>>assign(Map<String,Integer>partitionsPerTopic,Map<String,List<String>>subscriptions){Map<String,List<TopicPartition>>assignment=newHashMap<>();subscriptions.keySet().forEach(m->assignment.put(m,newArrayList<>()));// 解析每个Consumer的权重Map<String,Integer>weights=newHashMap<>();inttotalWeight=0;for(Stringconsumer:subscriptions.keySet()){intweight=4;// 默认权重// 实际场景中从userData获取(简化)weights.put(consumer,weight);totalWeight+=weight;}// 构建全局分区列表List<TopicPartition>allPartitions=newArrayList<>();for(Map.Entry<String,Integer>entry:partitionsPerTopic.entrySet()){for(intp=0;p<entry.getValue();p++){allPartitions.add(newTopicPartition(entry.getKey(),p));}}// 按权重比例分配String[]consumers=subscriptions.keySet().toArray(newString[0]);int[]allocatedCount=newint[consumers.length];int[]targetCount=newint[consumers.length];for(inti=0;i<allPartitions.size();i++){// 找分配比例最低的ConsumerintbestIdx=0;doublebestRatio=Double.MAX_VALUE;for(intj=0;j<consumers.length;j++){doubleratio=allocatedCount[j]*1.0/weights.get(consumers[j]);if(ratio<bestRatio){bestRatio=ratio;bestIdx=j;}}assignment.get(consumers[bestIdx]).add(allPartitions.get(i));allocatedCount[bestIdx]++;}returnassignment;}@OverridepublicStringname(){return"weighted";}}配置使用:
# consumer.properties partition.assignment.strategy=com.example.WeightedPartitionAssignor本篇小结
Kafka分区分配器是Consumer Group"公平分配"的核心算法组件:
- RangeAssignor:最简单,按Topic维度独立分配。优点是直观,缺点是Topic多时可能出现负载倾斜
- RoundRobinAssignor:全局轮询,更均匀但要求所有Consumer订阅完全相同
- StickyAssignor:最新的推荐策略,同时追求均衡性和粘性,在Rebalance时最大程度保持原有分配不变,尤其适合有状态消费场景
- 自定义PartitionAssignor:只需要实现PartitionAssignor接口,在assign()中实现自己的分配逻辑,然后在consume配置中指定。按权重分配、按数据中心就近分配等场景都可以通过自定义实现
下一篇,我们将深入Rebalance实现的完整状态机,看消费者是如何从发现到完成经历整个Rebalance过程的。
上一篇【第29篇】Kafka心跳机制源码解析——消费者如何向Broker"报平安"
下一篇【第31篇】Rebalance实现源码深度解析——从发现到完成的完整状态机(明日更新,敬请期待)