kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6391 ensure topics are created with correct partitions BEFORE building the… (#4347)
Date Tue, 02 Jan 2018 18:22:31 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5ca1226  KAFKA-6391 ensure topics are created with correct partitions BEFORE building
the… (#4347)
5ca1226 is described below

commit 5ca1226c22976050fd929c7bf66f3af8a5cd0493
Author: Clemens Valiente <clemens.valiente@gmail.com>
AuthorDate: Tue Jan 2 19:22:28 2018 +0100

    KAFKA-6391 ensure topics are created with correct partitions BEFORE building the… (#4347)
    
    * ensure topics are created with correct partitions BEFORE building the metadata for our
stream tasks
    
    * Added a test case. The test should fail with the old logic, because:
    While stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition is created correctly
with four partitions, the StreamPartitionAssignor will only assign three tasks to the topic.
Test passes with the new logic.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>,
Ted Yu <yuzhihong@gmail.com>
---
 .../internals/StreamPartitionAssignor.java         |  19 +--
 .../internals/StreamPartitionAssignorTest.java     | 128 +++++++++++++++++++--
 2 files changed, 126 insertions(+), 21 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index ef9ca3d..6709419 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -361,6 +361,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         } while (numPartitionsNeeded);
 
+
+        // ensure the co-partitioning topics within the group have the same number of partitions,
+        // and enforce the number of partitions for those repartition topics to be the same
if they
+        // are co-partitioned as well.
+        ensureCopartitioning(taskManager.builder().copartitionGroups(), repartitionTopicMetadata,
metadata);
+
+        // make sure the repartition source topics exist with the right number of partitions,
+        // create these topics if necessary
+        prepareTopic(repartitionTopicMetadata);
+
         // augment the metadata with the newly computed number of partitions for all the
         // repartition source topics
         final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new
HashMap<>();
@@ -374,15 +384,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
-        // ensure the co-partitioning topics within the group have the same number of partitions,
-        // and enforce the number of partitions for those repartition topics to be the same
if they
-        // are co-partitioned as well.
-        ensureCopartitioning(taskManager.builder().copartitionGroups(), repartitionTopicMetadata,
metadata);
-
-        // make sure the repartition source topics exist with the right number of partitions,
-        // create these topics if necessary
-        prepareTopic(repartitionTopicMetadata);
-
         final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
         taskManager.setClusterMetadata(fullMetadata);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index d9e0ee7..582c70b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -398,10 +399,11 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
 
-        mockTaskManager(Collections.<TaskId>emptySet(),
-                               Collections.<TaskId>emptySet(),
-                               uuid1,
-                builder);
+        mockTaskManager(
+            Collections.<TaskId>emptySet(),
+            Collections.<TaskId>emptySet(),
+            uuid1,
+            builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
@@ -633,6 +635,106 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
+    public void shouldGenerateTasksForAllCreatedPartitions() throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
+        internalTopologyBuilder.setApplicationId(applicationId);
+
+        // KStream with 3 partitions
+        KStream<Object, Object> stream1 = builder
+            .stream("topic1")
+            // force creation of internal repartition topic
+            .map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>()
{
+                @Override
+                public KeyValue<Object, Object> apply(final Object key, final Object
value) {
+                    return new KeyValue<>(key, value);
+                }
+            });
+
+        // KTable with 4 partitions
+        KTable<Object, Long> table1 = builder
+            .table("topic3")
+            // force creation of internal repartition topic
+            .groupBy(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>()
{
+                @Override
+                public KeyValue<Object, Object> apply(final Object key, final Object
value) {
+                    return new KeyValue<>(key, value);
+                }
+            })
+            .count();
+
+        // joining the stream and the table
+        // this triggers the enforceCopartitioning() routine in the StreamPartitionAssignor,
+        // forcing the stream.map to get repartitioned to a topic with four partitions.
+        stream1.join(
+            table1,
+            new ValueJoiner() {
+                @Override
+                public Object apply(final Object value1, final Object value2) {
+                    return null;
+                }
+            });
+
+        final UUID uuid = UUID.randomUUID();
+        final String client = "client1";
+
+        mockTaskManager(
+            Collections.<TaskId>emptySet(),
+            Collections.<TaskId>emptySet(),
+            UUID.randomUUID(),
+            internalTopologyBuilder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            streamsConfig,
+            mockClientSupplier.restoreConsumer);
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            client,
+            new PartitionAssignor.Subscription(
+                Utils.mkList("topic1", "topic3"),
+                new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
+            )
+        );
+
+        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata,
subscriptions);
+
+        final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
+        expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
4);
+        expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog",
4);
+        expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition",
4);
+        expectedCreatedInternalTopics.put(applicationId + "-topic3-STATE-STORE-0000000002-changelog",
4);
+
+        // check if all internal topics were created as expected
+        assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics));
+
+        final List<TopicPartition> expectedAssignment = Arrays.asList(
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic1", 1),
+            new TopicPartition("topic1", 2),
+            new TopicPartition("topic3", 0),
+            new TopicPartition("topic3", 1),
+            new TopicPartition("topic3", 2),
+            new TopicPartition("topic3", 3),
+            new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
0),
+            new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
1),
+            new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
2),
+            new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
3),
+            new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 0),
+            new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 1),
+            new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 2),
+            new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 3)
+        );
+
+        // check if we created a task for all expected topicPartitions.
+        assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new
HashSet<>(expectedAssignment)));
+    }
+
+    @Test
     public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source", null, null, null, "input");
@@ -640,10 +742,11 @@ public class StreamPartitionAssignorTest {
         builder.addSink("sink", "output", null, null, null, "processor");
 
         final UUID uuid1 = UUID.randomUUID();
-        mockTaskManager(Collections.<TaskId>emptySet(),
-                               Collections.<TaskId>emptySet(),
-                               uuid1,
-                builder);
+        mockTaskManager(
+            Collections.<TaskId>emptySet(),
+            Collections.<TaskId>emptySet(),
+            uuid1,
+            builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
(Object) userEndPoint));
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
         final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
@@ -837,10 +940,11 @@ public class StreamPartitionAssignorTest {
         builder.stream("topic1").groupByKey().count();
 
         final UUID uuid = UUID.randomUUID();
-        mockTaskManager(Collections.<TaskId>emptySet(),
-                               Collections.<TaskId>emptySet(),
-                               uuid,
-                internalTopologyBuilder);
+        mockTaskManager(
+            Collections.<TaskId>emptySet(),
+            Collections.<TaskId>emptySet(),
+            uuid,
+            internalTopologyBuilder);
 
         Map<String, Object> props = new HashMap<>();
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message