kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5566: fixed race condition between flush and commit
Date Mon, 10 Jul 2017 22:12:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a1f97c8dc -> 990382b94


KAFKA-5566: fixed race condition between flush and commit

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3504 from mjsax/kafka-5566-queryable-state-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/990382b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/990382b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/990382b9

Branch: refs/heads/trunk
Commit: 990382b94cfeb7a73a93ccbfd92e80f0f62fc7b0
Parents: a1f97c8
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Jul 10 15:12:47 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jul 10 15:12:47 2017 -0700

----------------------------------------------------------------------
 .../QueryableStateIntegrationTest.java          | 36 ++++++++++++--------
 1 file changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/990382b9/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 1a39a4a..3e6abfd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -172,10 +172,7 @@ public class QueryableStateIntegrationTest {
             "king of the world");
         inputValuesKeys = new HashSet<>();
         for (final String sentence : inputValues) {
-            final String[] words = sentence.split("\\W+");
-            for (final String word : words) {
-                inputValuesKeys.add(word);
-            }
+            Collections.addAll(inputValuesKeys, sentence.split("\\W+"));
         }
     }
 
@@ -429,7 +426,7 @@ public class QueryableStateIntegrationTest {
             new KeyValue<>(keys[3], 5L),
             new KeyValue<>(keys[4], 2L)));
         final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
-        expectedBatch1.addAll(Arrays.asList(
+        expectedBatch1.addAll(Collections.singleton(
             new KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
@@ -541,7 +538,7 @@ public class QueryableStateIntegrationTest {
             new KeyValue<>(keys[3], "5"),
             new KeyValue<>(keys[4], "2")));
         final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
-        expectedBatch1.addAll(Arrays.asList(
+        expectedBatch1.addAll(Collections.singleton(
             new KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
@@ -768,11 +765,11 @@ public class QueryableStateIntegrationTest {
             public boolean conditionMet() {
                 return "12".equals(store.get("a")) && "34".equals(store.get("b"));
             }
-        }, maxWaitMs, "wait for agg to be '123'");
+        }, maxWaitMs, "wait for agg to be <a,12> and <b,34>");
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
-            Arrays.asList(KeyValue.pair("a", "5")),
+            Collections.singleton(KeyValue.pair("a", "5")),
             TestUtils.producerConfig(
                 CLUSTER.bootstrapServers(),
                 StringSerializer.class,
@@ -791,12 +788,23 @@ public class QueryableStateIntegrationTest {
 
         final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, String>keyValueStore());
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
-            }
-        }, maxWaitMs, "wait for agg to be '123'");
+        try {
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return
+                        ("125".equals(store2.get("a"))
+                        || "1225".equals(store2.get("a"))
+                        || "12125".equals(store2.get("a")))
+                        &&
+                        ("34".equals(store2.get("b"))
+                        || "344".equals(store2.get("b"))
+                        || "3434".equals(store2.get("b")));
+                }
+            }, maxWaitMs, "wait for agg to be <a,125>||<a,1225>||<a,12125>
and <b,34>||<b,344>||<b,3434>");
+        } catch (final Throwable t) {
+            throw new RuntimeException("Store content is a: " + store2.get("a") + "; b: "
+ store2.get("b"), t);
+        }
 
     }
 


Mime
View raw message