flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [3/7] flink git commit: [FLINK-7143] [kafka] Introduce KafkaTopicPartitionAssigner with stricter assignment contracts
Date Fri, 28 Jul 2017 13:53:33 GMT
[FLINK-7143] [kafka] Introduce KafkaTopicPartitionAssigner with stricter assignment contracts

This commit refactors the local partition assignment logic to be located
in a strict contract-defining method, to make it explicit of the
expected partition to subtask assignment without relying solely on
hashCode's of kafka partitions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/888fabe9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/888fabe9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/888fabe9

Branch: refs/heads/master
Commit: 888fabe97cd15b6d92039a6f073cc92404d5b306
Parents: b1f37ef
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Mon Jul 24 15:00:16 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri Jul 28 21:52:29 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           |  9 ++--
 .../internals/AbstractPartitionDiscoverer.java  |  2 +-
 .../internals/KafkaTopicPartitionAssigner.java  | 56 ++++++++++++++++++++
 .../AbstractPartitionDiscovererTest.java        | 52 ++++++++++++------
 4 files changed, 98 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index fd5c96d..cdcb28b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -407,11 +408,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				if (!restoredFromOldState) {
 					// seed the partition discoverer with the union state while filtering out
 					// restored partitions that should not be subscribed by this subtask
-					if (AbstractPartitionDiscoverer.shouldAssignToThisSubtask(
-							restoredStateEntry.getKey(),
-							getRuntimeContext().getIndexOfThisSubtask(),
-							getRuntimeContext().getNumberOfParallelSubtasks())) {
-
+					if (KafkaTopicPartitionAssigner.assign(
+						restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
+							== getRuntimeContext().getIndexOfThisSubtask()){
 						subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
 					}
 				} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
index 725092e..39645be 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -198,7 +198,7 @@ public abstract class AbstractPartitionDiscoverer {
 		if (isUndiscoveredPartition(partition)) {
 			topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
 
-			return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
+			return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
 		}
 
 		return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
new file mode 100644
index 0000000..944630f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * Utility for assigning Kafka partitions to consumer subtasks.
+ */
+public class KafkaTopicPartitionAssigner {
+
+	/**
+	 * Returns the index of the target subtask that a specific Kafka partition should be
+	 * assigned to.
+	 *
+	 * <p>The resulting distribution of partitions of a single topic has the following
contract:
+	 * <ul>
+	 *     <li>1. Uniformly distributed across subtasks</li>
+	 *     <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
+	 *     subtask indices) by using the partition id as the offset from a starting index
+	 *     (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
+	 *     determined using the topic name).</li>
+	 * </ul>
+	 *
+	 * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on
this
+	 * contract to locally filter out partitions that it should not subscribe to, guaranteeing
+	 * that all partitions of a single topic will always be assigned to some subtask in a
+	 * uniformly distributed manner.
+	 *
+	 * @param partition the Kafka partition
+	 * @param numParallelSubtasks total number of parallel subtasks
+	 *
+	 * @return index of the target subtask that the Kafka partition should be assigned to.
+	 */
+	public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
+		int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
+
+		// here, the assumption is that the id of Kafka partitions are always ascending
+		// starting from 0, and therefore can be used directly as the offset clockwise from the
start index
+		return (startIndex + partition.getPartition()) % numParallelSubtasks;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
index b00d74d..4d3e542 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
@@ -72,6 +72,11 @@ public class AbstractPartitionDiscovererTest {
 			new KafkaTopicPartition(TEST_TOPIC, 2),
 			new KafkaTopicPartition(TEST_TOPIC, 3));
 
+		int numSubtasks = mockGetAllPartitionsForTopicsReturn.size();
+
+		// get the start index; the assertions below will fail if the assignment logic does not
meet correct contracts
+		int numConsumers = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0),
numSubtasks);
+
 		for (int subtaskIndex = 0; subtaskIndex < mockGetAllPartitionsForTopicsReturn.size();
subtaskIndex++) {
 			TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 					topicsDescriptor,
@@ -85,7 +90,7 @@ public class AbstractPartitionDiscovererTest {
 			assertEquals(1, initialDiscovery.size());
 			assertTrue(contains(mockGetAllPartitionsForTopicsReturn, initialDiscovery.get(0).getPartition()));
 			assertEquals(
-				getExpectedSubtaskIndex(initialDiscovery.get(0), mockGetAllPartitionsForTopicsReturn.size()),
+				getExpectedSubtaskIndex(initialDiscovery.get(0), numConsumers, numSubtasks),
 				subtaskIndex);
 
 			// subsequent discoveries should not find anything
@@ -114,6 +119,9 @@ public class AbstractPartitionDiscovererTest {
 			final int minPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers;
 			final int maxPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers
+ 1;
 
+			// get the start index; the assertions below will fail if the assignment logic does not
meet correct contracts
+			int startIndex = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0),
numConsumers);
+
 			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
@@ -130,7 +138,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
-					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
+					assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -163,6 +171,9 @@ public class AbstractPartitionDiscovererTest {
 
 			final int numConsumers = 2 * mockGetAllPartitionsForTopicsReturn.size() + 3;
 
+			// get the start index; the assertions below will fail if the assignment logic does not
meet correct contracts
+			int startIndex = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0),
numConsumers);
+
 			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
@@ -178,7 +189,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
-					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
+					assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -222,6 +233,9 @@ public class AbstractPartitionDiscovererTest {
 			final int minNewPartitionsPerConsumer = allPartitions.size() / numConsumers;
 			final int maxNewPartitionsPerConsumer = allPartitions.size() / numConsumers + 1;
 
+			// get the start index; the assertions below will fail if the assignment logic does not
meet correct contracts
+			int startIndex = KafkaTopicPartitionAssigner.assign(allPartitions.get(0), numConsumers);
+
 			TestPartitionDiscoverer partitionDiscovererSubtask0 = new TestPartitionDiscoverer(
 					topicsDescriptor,
 					0,
@@ -260,19 +274,19 @@ public class AbstractPartitionDiscovererTest {
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -299,32 +313,32 @@ public class AbstractPartitionDiscovererTest {
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -370,7 +384,7 @@ public class AbstractPartitionDiscovererTest {
 					subtaskIndex,
 					numSubtasks,
 					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
-					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturnOutOfOrder));
 			partitionDiscovererOutOfOrder.open();
 
 			List<KafkaTopicPartition> discoveredPartitions = partitionDiscoverer.discoverPartitions();
@@ -487,7 +501,15 @@ public class AbstractPartitionDiscovererTest {
 		return clone;
 	}
 
-	private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int numTasks)
{
-		return Math.abs(partition.hashCode() % numTasks);
+	/**
+	 * Utility method that determines the expected subtask index a partition should be assigned
to,
+	 * depending on the start index and using the partition id as the offset from that start
index
+	 * in clockwise direction.
+	 *
+	 * <p>The expectation is based on the distribution contract of
+	 * {@link KafkaTopicPartitionAssigner#assign(KafkaTopicPartition, int)}.
+	 */
+	private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int startIndex,
int numSubtasks) {
+		return (startIndex + partition.getPartition()) % numSubtasks;
 	}
 }


Mime
View raw message