kafka源码分析-consumer的分区策略
kafka源码分析-consumer的分区策略
- 1、AbstractPartitionAssignor
- 2、RangeAssignor
- 3、RoundRobinAssignor
- 4、StickyAssignor策略
本文源码是kafka 2.0.1
1、AbstractPartitionAssignor
consumer有三种分区策略,分别是RangeAssignor、RoundRobinAssignor和StickyAssignor,这三个策略都继承了AbstractPartitionAssignor,实现了其assign方法。该方法有两个参数:
- partitionsPerTopic-每个topic的分区数量
- subscriptions-每个 consumerId 与其所订阅的 topic 列表的关系,可以理解成每个consumer可能被分配到的topic
/* Perform the group assignment given the partition counts and member subscriptions* @param partitionsPerTopic The number of partitions for each subscribed topic. Topics not in metadata will be excluded* from this map.* @param subscriptions Map from the memberId to their respective topic subscription* @return Map from each member to the list of partitions assigned to them.*/public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions);
2、RangeAssignor
先看代码实现
public class RangeAssignor extends AbstractPartitionAssignor { @Override public String name() { return "range"; } private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) { Map<String, List<String>> res = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); for (String topic : subscriptionEntry.getValue().topics()) put(res, topic, consumerId); } return res; } // partitionsPerTopic-每个topic的分区数量 // subscriptions-每个 consumerId 与其所订阅的 topic 列表的关系 @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { // 将subscriptions转换成key为topic,value为consumerId的map Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); // 存储rebalace方案的数据结构 Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); // 遍历consumersPerTopic for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); // 和这个topic相关的consumer列表 List<String> consumersForTopic = topicEntry.getValue(); // 该topic的分区数 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); // 取商 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); // 取余数 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); // 【以下关键分配步骤】 // 生成TopicPartition列表 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { // 本consumer分配到的初始位置 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); // 长度 int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); // 当consumersWithExtraPartition 不是0时,优先给前面的consumer多分配一个partition,能整除部分各consumer均分 assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
重点关注源码的【关键分配步骤】,该步骤是将所有topic的分区平均分配给每个consumer,对于不能平均分配的分配给前consumersWithExtraPartition个consumer,也就是前consumersWithExtraPartition个consumer分配到的分区数会比后面的多一个。
【例1】
假设有一个topic有7个partition,有三个consumer都订阅了该topic,那么通过RangeAssignor的分配方案为:
- consumer0:start=0,length=3,分配到的partition为:p0、p1、p2
- consumer1:start=3,length=2,分配到的partition为:p3、p4
- consumer2:start=5,length=2,分配到的partition为:p5、p6
【例2】
在看一个多topic分配的例子,假设consumer0订阅了topic0、topic1、topic2,consumer1订阅了topic0、topic1,consumer2订阅了topic2,topic0、topic1、topic2分别有3、2、1个partition,那么分配的结果是:
consumer | 分配到的partition |
---|---|
consumer0 | t0p0、t0p1、t1p0、t2p0 |
consumer1 | t0p2、t1p1 |
consumer2 |
可以看到分配结果并不均匀,甚至有consumer闲置。
3、RoundRobinAssignor
先看源码
public class RoundRobinAssignor extends AbstractPartitionAssignor { @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { // 存储数据结构 Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); // 【关键分配步骤】 // 将consumer排序并生成环状迭代器 CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); // 遍历所有的partition for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); // 找到订阅这个partition对应topic的消费者 while (!subscriptions.get(assigner.peek()).topics().contains(topic)) // 返回迭代器当前位置的元素,并将迭代器计数+1 assigner.next(); // 将partition分配给消费者 assignment.get(assigner.next()).add(partition); } return assignment; } // 获取所有的partition,并且同一个topic的partition是相连的 public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { SortedSet<String> topics = new TreeSet<>(); for (Subscription subscription : subscriptions.values()) topics.addAll(subscription.topics()); List<TopicPartition> allPartitions = new ArrayList<>(); for (String topic : topics) { Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic != null) allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic)); } return allPartitions; } @Override public String name() { return "roundrobin"; } }
重点关注【关键分配步骤】它的分配规则是遍历所有的topic-partition,对每个consumer以环状的形式进行分配,从当前位置往后找到第一个可以分配的consumer,将当前partition分配给找到的consumer,并将位置+1,继续下一个partition的分配。
看2中的【例2】使用RoundRobinAssignor的分配方案
分配轮次 | consumer0 | consumer1 | consumer2 |
---|---|---|---|
1 | t0p0 | t0p1 | |
2 | t0p2 | t1p0 | |
3 | t1p1 | t2p0 |
可以看到相比于RangeAssignor分配更加均匀,但是这种方式并不能完全解决平均分配的问题,看一下下面的例子
【例3】
假设consumer0订阅了topic0、topic1、topic2,consumer1订阅了topic0、topic1,consumer2订阅了topic2,topic0、topic1、topic2分别有4、3、2个partition,那么分配的结果是:
consumer | 分配到的partition |
---|---|
consumer0 | t0p0、t0p2、t1p0、t1p2、t2p1 |
consumer1 | t0p1、t0p3、t1p1 |
consumer2 | t2p0 |
可以看到将t2p1分配给consumer2的话,分配结果更加平均。
4、StickyAssignor策略
sticky-粘性的,非常形象的名字,StickyAssignor从0.11版本才开始引入的,主要有两个目的
- 目的1:分区的分配要尽可能均匀
- 目的2:分区的分配要尽可能与上次分配的保持相同
目的1和目的2冲突时,目的1优于目的2。
StickyAssignor的代码较多,先看下面的流程图
以3中的【例3】为例来看一下StickyAssignor策略的分配结果
consumer | 分配到的partition |
---|---|
consumer0 | t0p0、t0p2、t1p0、t1p2 |
consumer1 | t0p1、t0p3、t1p1 |
consumer2 | t2p0、t2p1 |
相比于RoundRobinAssignor,分配结果达到均衡。假设新加入了consumer3,其订阅了topic0,那么再看一下再均衡的过程:
(1)所有partition的排序结果:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p1、t2p0、t2p1
(2)初始的consumer排序:consumer3、consumer2、consumer1、consumer0
(3)开始再均衡,显然t0p0是可以再分配的,重新分配后的结果及consuemr顺序是:
consumer(排序) | 分配到的partition |
---|---|
consumer3 | t0p0 |
consumer2 | t2p0、t2p1 |
consumer0 | t0p2、t1p0、t1p2 |
consumer1 | t0p1、t0p3、t1p1 |
继续处理t0p1,显然也可以再分配,由consumer1调整到consumer3
consumer(排序) | 分配到的partition |
---|---|
consumer2 | t2p0、t2p1 |
consumer3 | t0p0、t0p1 |
consumer1 | t0p3、t1p1 |
consumer0 | t0p2、t1p0、t1p2 |
(4)完成再分配过程,返回结果
流程图中省略了很多细节,相关内容可见下面源码分析:
public class StickyAssignor extends AbstractPartitionAssignor { private static final Logger log = LoggerFactory.getLogger(StickyAssignor.class); // these schemas are used for preserving consumer's previously assigned partitions // list and sending it as user data to the leader during a rebalance private static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment"; private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; private static final Schema TOPIC_ASSIGNMENT = new Schema( new Field(TOPIC_KEY_NAME, Type.STRING), new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32))); private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT))); private List<TopicPartition> memberAssignment = null; private PartitionMovements partitionMovements; public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<TopicPartition>> currentAssignment = new HashMap<>(); partitionMovements = new PartitionMovements(); // 先构建出当前的分配状态:currentAssignment // 这个方法里用的userData是自定义信息,但是是怎么用的?? prepopulateCurrentAssignments(subscriptions, currentAssignment); // 判断是否是全新的分配,true-是 boolean isFreshAssignment = currentAssignment.isEmpty(); // 记录partirion可以分配给哪些consumer final Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>(); // 记录consumer能够被分配到哪些partition final Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>(); // 初始化partition2AllPotentialConsumers,value是空List for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { for (int i = 0; i < entry.getValue(); ++i) partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<String>()); } // 遍历subscriptions for (Entry<String, Subscription> entry: subscriptions.entrySet()) { String consumer = entry.getKey(); // 初始化consumer2AllPotentialPartitions的每个consumer consumer2AllPotentialPartitions.put(consumer, new ArrayList<TopicPartition>()); for (String topic: entry.getValue().topics()) { for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { TopicPartition topicPartition = new TopicPartition(topic, i); // 将这个consumer订阅的topic的所有partition存入consumer2AllPotentialPartitions consumer2AllPotentialPartitions.get(consumer).add(topicPartition); // 将这个topic的所有partition都记录上consumer partition2AllPotentialConsumers.get(topicPartition).add(consumer); } } // 当前consumer的分配方案不存在 if (!currentAssignment.containsKey(consumer)) // 初始化当前consumer的分配存储结构 currentAssignment.put(consumer, new ArrayList<TopicPartition>()); } // 以上完成partition2AllPotentialConsumers和consumer2AllPotentialPartitions的初始化 // 记录当前分配方案中的partition和consumer的关系 Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>(); for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet()) for (TopicPartition topicPartition: entry.getValue()) currentPartitionConsumer.put(topicPartition, entry.getKey()); // 对有效分区进行排序,以便它们在潜在的重新分配阶段以适当的顺序进行处理,从而使消费者之间的分区移动最小(因此尊重最大粘性) // 详细解析见下面sortPartitions的源码 List<TopicPartition> sortedPartitions = sortPartitions( currentAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions); // 将排序后的partition存到unassignedPartitions List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions); // 遍历当前的分配方案,进行删除处理 for (Iterator<Map.Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) { Map.Entry<String, List<TopicPartition>> entry = it.next(); if (!subscriptions.containsKey(entry.getKey())) { // 之前的consumer不在存在,删除该consumer订阅的所有partition,并从当前分配方案中删除该consumer for (TopicPartition topicPartition: entry.getValue()) currentPartitionConsumer.remove(topicPartition); it.remove(); } else { // otherwise (the consumer still exists) for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) { TopicPartition partition = partitionIter.next(); if (!partition2AllPotentialConsumers.containsKey(partition)) { // 这个partition不再存在,从当前分配方案删除partiton,并从currentPartitionConsumer删除partition partitionIter.remove(); currentPartitionConsumer.remove(partition); } else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) { // 该consumer不再订阅该partition对应的topic,从当前分配方案删除partiton partitionIter.remove(); } else // 该partition已经被分配过,从未分配的partition中删除partition unassignedPartitions.remove(partition); } } } // at this point we have preserved all valid topic partition to consumer assignments and removed // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions // to consumers so that the topic partition assignments are as balanced as possible. // 根据已经分配给消费者的主题分区的数量,对消费者进行升序排序 TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); balance(currentAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); return currentAssignment; } // 省略一些方法}
/ * Sort valid partitions so they are processed in the potential reassignment phase in the proper order * that causes minimal partition movement among consumers (hence honoring maximal stickiness) * * @param currentAssignment the calculated assignment so far * @param isFreshAssignment whether this is a new assignment, or a reassignment of an existing one * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from * @return sorted list of valid partitions */private List<TopicPartition> sortPartitions(Map<String, List<TopicPartition>> currentAssignment, boolean isFreshAssignment, Map<TopicPartition, List<String>> partition2AllPotentialConsumers, Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) { List<TopicPartition> sortedPartitions = new ArrayList<>(); if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) { // 如果不是新的分配,且每个consumer所可能分配到的partition都是一样的 // 复制一份当前的分配方案 Map<String, List<TopicPartition>> assignments = deepCopy(currentAssignment); // 将不属于当前consumer的partition从当前分配方案中删除 for (Entry<String, List<TopicPartition>> entry: assignments.entrySet()) { List<TopicPartition> toRemove = new ArrayList<>(); for (TopicPartition partition: entry.getValue()) if (!partition2AllPotentialConsumers.keySet().contains(partition)) toRemove.add(partition); for (TopicPartition partition: toRemove) entry.getValue().remove(partition); } // SubscriptionComparator是根据两个key对应value的长度进行比较,长度相同根据key进行字符串排序 TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments)); sortedConsumers.addAll(assignments.keySet()); // 将partition进行排序,是按consumer的顺序倒序取出,也就是分配到更多partition的consumer while (!sortedConsumers.isEmpty()) { String consumer = sortedConsumers.pollLast(); List<TopicPartition> remainingPartitions = assignments.get(consumer); if (!remainingPartitions.isEmpty()) { sortedPartitions.add(remainingPartitions.remove(0)); sortedConsumers.add(consumer); } } // 将不属于consumer的partition放入到排序结果中 for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) { if (!sortedPartitions.contains(partition)) sortedPartitions.add(partition); } } else { // PartitionComparator是根据两个key对应value的长度进行比较,如果长度相同则根据key进行字符串排序,在相同就根据topic的partition数排序 TreeSet<TopicPartition> sortedAllPartitions = new TreeSet<>(new PartitionComparator(partition2AllPotentialConsumers)); sortedAllPartitions.addAll(partition2AllPotentialConsumers.keySet()); // 顺序取出partition进行排序,越少可以被consumer分配到的partition越在前面 while (!sortedAllPartitions.isEmpty()) sortedPartitions.add(sortedAllPartitions.pollFirst()); } return sortedPartitions;}
/ * Balance the current assignment using the data structures created in the assign(...) method above. */private void balance(Map<String, List<TopicPartition>> currentAssignment,List<TopicPartition> sortedPartitions,List<TopicPartition> unassignedPartitions,TreeSet<String> sortedCurrentSubscriptions,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions,Map<TopicPartition, List<String>> partition2AllPotentialConsumers,Map<TopicPartition, String> currentPartitionConsumer) { // 是否是初始化分配,如果最大数量partition的consumer所分配到的partition都是空的,则是新的分配 boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty(); boolean reassignmentPerformed = false; // 分配还未分配的partition // 分配给最少partition的consumer,并重新将该consumer放入sortedCurrentSubscriptions,为了重新将consumer排序 for (TopicPartition partition: unassignedPartitions) { // skip if there is no potential consumer for the partition if (partition2AllPotentialConsumers.get(partition).isEmpty()) continue; assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer); } // fixedPartitions记录只能被分配给唯一consumer的partition,并将这部分partition从sortedPartitions中移除 // 到目前为止所有的所有的partition已经被分配了,剔除掉不可能被重新分配的partition // fixedPartitions在后面并没有什么用途,单纯的中间变量 Set<TopicPartition> fixedPartitions = new HashSet<>(); for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) if (!canParticipateInReassignment(partition, partition2AllPotentialConsumers)) fixedPartitions.add(partition); sortedPartitions.removeAll(fixedPartitions); // 将已经分配结束的consumer从sortedCurrentSubscriptions删除,并将其对应的partition存入fixedAssignments Map<String, List<TopicPartition>> fixedAssignments = new HashMap<>(); for (String consumer: consumer2AllPotentialPartitions.keySet()) // canParticipateInReassignment:consumer是否可以被重新分配,true-可以 // 1、没满:满即所有可能分配给这个consumer的partition已全部分配给该consumer,返回true // 2、满了,但是该consumer的partition有任一partition可以被分配给多个consumer,返回true if (!canParticipateInReassignment(consumer, currentAssignment,consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) { sortedCurrentSubscriptions.remove(consumer); fixedAssignments.put(consumer, currentAssignment.remove(consumer)); } // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later // 备份 Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment); Map<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer); // reassignmentPerformed-true,意味着被重新分配过 reassignmentPerformed = performReassignments(sortedPartitions, currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); // getBalanceScore-计算平衡分数,分数越小越好,分数是正的 // 如何计算:遍历所有consumer的partition数和其他consumer的partition数相减的绝对值累加,已经遍历过的consumer要删除(即后面的partition计算时不会在用到前面已处理过的consumer) if (!initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment)) { // 重新分配的方案不好,重置回原来的方案 deepCopy(preBalanceAssignment, currentAssignment); currentPartitionConsumer.clear(); currentPartitionConsumer.putAll(preBalancePartitionConsumers); } // 将前面不需要重新分配的consumer重新放到当前分配方案和排序中 for (Entry<String, List<TopicPartition>> entry: fixedAssignments.entrySet()) { String consumer = entry.getKey(); currentAssignment.put(consumer, entry.getValue()); sortedCurrentSubscriptions.add(consumer); } fixedAssignments.clear();}
private boolean performReassignments(List<TopicPartition> reassignablePartitions, Map<String, List<TopicPartition>> currentAssignment, TreeSet<String> sortedCurrentSubscriptions, Map<String, List<TopicPartition>> consumer2AllPotentialPartitions, Map<TopicPartition, List<String>> partition2AllPotentialConsumers, Map<TopicPartition, String> currentPartitionConsumer) { boolean reassignmentPerformed = false; boolean modified; // repeat reassignment until no partition can be moved to improve the balance do { modified = false; // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed) // until the full list is processed or a balance is achieved // 重新分配所有可重新分配的分区(从潜在使用者最少的分区开始,如果需要),或者达到平衡 Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator(); // isBalanced-判断当前方案已经平衡 // 1、当前分配方案分配到最少partition的consumer的数量大于等于最多的数量-1,返回ture // 2、数量比较少的consumer,没有其他consumer的partition有可能分配给该consumer,返回true while (partitionIterator.hasNext() && !isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) { TopicPartition partition = partitionIterator.next(); // the partition must have at least two consumers if (partition2AllPotentialConsumers.get(partition).size() <= 1) log.error("Expected more than one potential consumer for partition '" + partition + "'"); // the partition must have a current consumer String consumer = currentPartitionConsumer.get(partition); if (consumer == null) log.error("Expected partition '" + partition + "' to be assigned to a consumer"); // check if a better-suited consumer exist for the partition; if so, reassign it for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) { if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) { // 参照下面源码解析 reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions); reassignmentPerformed = true; modified = true; break; } } } } while (modified); return reassignmentPerformed;}
private void reassignPartition(TopicPartition partition, Map<String, List<TopicPartition>> currentAssignment, TreeSet<String> sortedCurrentSubscriptions, Map<TopicPartition, String> currentPartitionConsumer, Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) { String consumer = currentPartitionConsumer.get(partition); // sortedCurrentSubscriptions是按consumer已分配的partition数量升序,所以找到第一可以分配到该partition的consumer就是新的可分配consumer String newConsumer = null; for (String anotherConsumer: sortedCurrentSubscriptions) { if (consumer2AllPotentialPartitions.get(anotherConsumer).contains(partition)) { newConsumer = anotherConsumer; break; } } assert newConsumer != null; // 找到准确的需要被重新分配的partition,为了粘性分配,详情参考下面getTheActualPartitionToBeMoved源码 TopicPartition partitionToBeMoved = partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer); // 将需要移动的partition进行重新分配 // 1、将新旧consumer从排序结果中删除,为了重新存入排序 // 2、更新partitionMovements和partitionMovementsByTopic // 3、更新新旧consumer的分配方案和currentPartitionConsumer // 4、对新旧consumer的分配结果重新排序 processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer); return;}
// partitionMovementsByTopic-存储partition上一次的调整关系(由srcConsumer调整到destConsumer)// partitionMovementsForThisTopic-存储topic维度下ConsumerPair(新旧consumer)调整的partitionprivate TopicPartition getTheActualPartitionToBeMoved(TopicPartition partition, String oldConsumer, String newConsumer) { String topic = partition.topic(); // 该topic的所有partition从未被调整过,直接返回该partition if (!partitionMovementsByTopic.containsKey(topic)) return partition; // 该partition被调整过 if (partitionMovements.containsKey(partition)) { // this partition has previously moved // 这次调整的旧consumer一定是上次调整的新consumer assert oldConsumer.equals(partitionMovements.get(partition).dstMemberId); // 旧consumer赋值为上次调整的旧consumer // 这是为了下面更大限度保证粘性,比如partiiton0上次调整是A->C,这次是C->B,但是存在一个partiiton1是从B调整到A的, // 经过这个赋值,将会用partition1代替partition0进行调整,这样B趋于平衡,A趋于不平衡,这样下次调整就有可能将A调整到C的partiiton调整会A // 这个设计比较绕 oldConsumer = partitionMovements.get(partition).srcMemberId; } Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); // 新旧consumer的“反consumer对” ConsumerPair reversePair = new ConsumerPair(newConsumer, oldConsumer); // 不存在“反consumer对”,直接返回该partition if (!partitionMovementsForThisTopic.containsKey(reversePair)) return partition; // 返回该“反consumer对”之前调整的一个partition,为了满足StickyAssignor的目的2,尽可能保证分配和上次相同 // 举个例子,比如这次要将partition0从consumerA调整到consumerB,但是在这之前曾将partition1从consumerB调整到consumerA,那么需要用partiiton1代替partiiton0 // 这时候partiion0和partition1都是属于consumerA的,调整partiion0和partition1都可以使分配方案趋于平衡,但是调整partition1更符合粘性策略 return partitionMovementsForThisTopic.get(reversePair).iterator().next(); }
本文主要对kafka的三种分区策略进行源码分析,RangeAssignor、RoundRobinAssignor都不能保证完全的均衡分配,StickyAssignor虽然实现复杂,但是相比于其他两种分配策略,均衡效果更好,而且可以减少不必要的分区调整。