kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Fix race condition in KafkaStreamsTest.shouldReturnThreadMetadata
Date Tue, 19 Dec 2017 01:39:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 22f742cdd -> 9aad649fd


MINOR: Fix race condition in KafkaStreamsTest.shouldReturnThreadMetadata

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #4337 from mjsax/minor-fix-kafakstreamstest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9aad649f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9aad649f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9aad649f

Branch: refs/heads/trunk
Commit: 9aad649fd0defa4b55f305e3e025883ebb567b34
Parents: 22f742c
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Dec 18 17:39:25 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Dec 18 17:39:25 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamsBuilder.java    | 20 ++++++++++----------
 .../apache/kafka/streams/KafkaStreamsTest.java  |  6 ++----
 2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9aad649f/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a94b0a7..27105c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -81,7 +81,7 @@ public class StreamsBuilder {
     /**
      * Create a {@link KStream} from the specified topics.
      * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value
deserializers
-     * are defined by the options in {@link Consumed}.
+     * are defined by the options in {@link Consumed} are used.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case it is the user's responsibility to repartition the data before
any key based operation
@@ -117,7 +117,7 @@ public class StreamsBuilder {
     /**
      * Create a {@link KStream} from the specified topics.
      * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value
deserializers
-     * are defined by the options in {@link Consumed}.
+     * are defined by the options in {@link Consumed} are used.
      * <p>
      * If multiple topics are specified there is no ordering guarantee for records from different
topics.
      * <p>
@@ -159,7 +159,7 @@ public class StreamsBuilder {
     /**
      * Create a {@link KStream} from the specified topic pattern.
      * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value
deserializers
-     * are defined by the options in {@link Consumed}.
+     * are defined by the options in {@link Consumed} are used.
      * <p>
      * If multiple topics are matched by the specified pattern, the created {@link KStream}
will read data from all of
      * them and there is no ordering guarantee between records from different topics.
@@ -181,8 +181,8 @@ public class StreamsBuilder {
 
     /**
      * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor},
and
-     * default key and value deserializers as specified in the {@link StreamsConfig config}
are used.
+     * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value
deserializers
+     * are defined by the options in {@link Consumed} are used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * Note that the specified input topic must be partitioned by key.
@@ -251,7 +251,7 @@ public class StreamsBuilder {
     /**
      * Create a {@link KTable} for the specified topic.
      * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value
deserializers
-     * are defined by the options in {@link Consumed}.
+     * are defined by the options in {@link Consumed} are used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * Note that the specified input topics must be partitioned by key.
@@ -280,8 +280,8 @@ public class StreamsBuilder {
 
     /**
      * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
as specified in the
-     * {@link StreamsConfig config} are used.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig
config} are used.
+     * Key and value deserializers as defined by the options in {@link Materialized} are
used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * Note that the specified input topics must be partitioned by key.
@@ -317,7 +317,7 @@ public class StreamsBuilder {
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy
{@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
+     * regardless of the specified value in {@link StreamsConfig} or {@link Consumed}.
      *
      * @param topic the topic name; cannot be {@code null}
      * @param consumed  the instance of {@link Consumed} used to define optional parameters
@@ -382,7 +382,7 @@ public class StreamsBuilder {
      * Long valueForKey = localStore.get(key);
      * }</pre>
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy
{@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
+     * regardless of the specified value in {@link StreamsConfig} or {@link Consumed}.
      *
      * @param topic         the topic name; cannot be {@code null}
      * @param consumed      the instance of {@link Consumed} used to define optional parameters;
can't be {@code null}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9aad649f/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index ab299dd..8746c62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -395,15 +395,13 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldReturnThreadMetadata() {
-        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();
         Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
         assertNotNull(threadMetadata);
         assertEquals(2, threadMetadata.size());
         for (ThreadMetadata metadata : threadMetadata) {
-            assertTrue("#threadState() was: " + metadata.threadState() + "; expected either
RUNNING or CREATED",
-                Utils.mkList("RUNNING", "CREATED").contains(metadata.threadState()));
+            assertTrue("#threadState() was: " + metadata.threadState() + "; expected either
RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
+                Utils.mkList("RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()));
             assertEquals(0, metadata.standbyTasks().size());
             assertEquals(0, metadata.activeTasks().size());
         }


Mime
View raw message