kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: HOTFIX: Fix unstable Streams application reset integration test
Date Thu, 28 Jul 2016 23:24:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 071b76cc5 -> 38c65a9a3


HOTFIX: Fix unstable Streams application reset integration test

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1673 from mjsax/hotfix

(cherry picked from commit ad1dab9c3d3ae14746ee5d94434ef98ef4889023)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38c65a9a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38c65a9a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38c65a9a

Branch: refs/heads/trunk
Commit: 38c65a9a39081068e05e3222ccb0dbc3c9f56942
Parents: 071b76c
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Jul 28 22:10:06 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jul 29 00:01:48 2016 +0100

----------------------------------------------------------------------
 .../streams/integration/ResetIntegrationTest.java    | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/38c65a9a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index bffdfd9..85aff26 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -64,6 +64,7 @@ public class ResetIntegrationTest {
     private static final String INPUT_TOPIC = "inputTopic";
     private static final String OUTPUT_TOPIC = "outputTopic";
     private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
     private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
 
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
@@ -74,16 +75,17 @@ public class ResetIntegrationTest {
         CLUSTER.createTopic(INPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC_2);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
         CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
     }
 
     @Test
-    public void testReprocessingFromScratchAfterCleanUp() throws Exception {
+    public void testReprocessingFromScratchAfterReset() throws Exception {
         final Properties streamsConfiguration = prepareTest();
         final Properties resultTopicConsumerConfig = prepareResultConsumer();
 
         prepareInputData();
-        final KStreamBuilder builder = setupTopology();
+        final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2);
 
         // RUN
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
@@ -103,10 +105,10 @@ public class ResetIntegrationTest {
         Utils.sleep(CLEANUP_CONSUMER_TIMEOUT);
 
         // RE-RUN
-        streams = new KafkaStreams(setupTopology(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.start();
         final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
OUTPUT_TOPIC, 10);
-        final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
OUTPUT_TOPIC_2, 1).get(0);
+        final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
OUTPUT_TOPIC_2_RERUN, 1).get(0);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
@@ -163,7 +165,7 @@ public class ResetIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "jjj")), producerConfig, 64L);
     }
 
-    private KStreamBuilder setupTopology() {
+    private KStreamBuilder setupTopology(final String outputTopic2) {
         final KStreamBuilder builder = new KStreamBuilder();
 
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
@@ -201,7 +203,7 @@ public class ResetIntegrationTest {
                     return new KeyValue<>(key.window().start() + key.window().end(),
value);
                 }
             });
-        windowedCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC_2);
+        windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2);
 
         return builder;
     }
@@ -229,6 +231,7 @@ public class ResetIntegrationTest {
         expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
         expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
 
         Set<String> allTopics;


Mime
View raw message