Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6E84200D6D for ; Tue, 19 Dec 2017 02:39:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B5329160C29; Tue, 19 Dec 2017 01:39:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D3FF1160C05 for ; Tue, 19 Dec 2017 02:39:33 +0100 (CET) Received: (qmail 76247 invoked by uid 500); 19 Dec 2017 01:39:33 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 76238 invoked by uid 99); 19 Dec 2017 01:39:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Dec 2017 01:39:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D239CDFC32; Tue, 19 Dec 2017 01:39:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Fix race condition in KafkaStreamsTest.shouldReturnThreadMetadata Date: Tue, 19 Dec 2017 01:39:32 +0000 (UTC) archived-at: Tue, 19 Dec 2017 01:39:34 -0000 Repository: kafka Updated Branches: refs/heads/trunk 22f742cdd -> 9aad649fd MINOR: Fix race condition in KafkaStreamsTest.shouldReturnThreadMetadata Author: Matthias J. Sax Reviewers: Guozhang Wang Closes #4337 from mjsax/minor-fix-kafakstreamstest Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9aad649f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9aad649f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9aad649f Branch: refs/heads/trunk Commit: 9aad649fd0defa4b55f305e3e025883ebb567b34 Parents: 22f742c Author: Matthias J. Sax Authored: Mon Dec 18 17:39:25 2017 -0800 Committer: Guozhang Wang Committed: Mon Dec 18 17:39:25 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/StreamsBuilder.java | 20 ++++++++++---------- .../apache/kafka/streams/KafkaStreamsTest.java | 6 ++---- 2 files changed, 12 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9aad649f/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index a94b0a7..27105c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -81,7 +81,7 @@ public class StreamsBuilder { /** * Create a {@link KStream} from the specified topics. * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers - * are defined by the options in {@link Consumed}. + * are defined by the options in {@link Consumed} are used. *

* Note that the specified input topics must be partitioned by key. * If this is not the case it is the user's responsibility to repartition the data before any key based operation @@ -117,7 +117,7 @@ public class StreamsBuilder { /** * Create a {@link KStream} from the specified topics. * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers - * are defined by the options in {@link Consumed}. + * are defined by the options in {@link Consumed} are used. *

* If multiple topics are specified there is no ordering guarantee for records from different topics. *

@@ -159,7 +159,7 @@ public class StreamsBuilder { /** * Create a {@link KStream} from the specified topic pattern. * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers - * are defined by the options in {@link Consumed}. + * are defined by the options in {@link Consumed} are used. *

* If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of * them and there is no ordering guarantee between records from different topics. @@ -181,8 +181,8 @@ public class StreamsBuilder { /** * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and - * default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers + * are defined by the options in {@link Consumed} are used. * Input {@link KeyValue records} with {@code null} key will be dropped. *

* Note that the specified input topic must be partitioned by key. @@ -251,7 +251,7 @@ public class StreamsBuilder { /** * Create a {@link KTable} for the specified topic. * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers - * are defined by the options in {@link Consumed}. + * are defined by the options in {@link Consumed} are used. * Input {@link KeyValue records} with {@code null} key will be dropped. *

* Note that the specified input topics must be partitioned by key. @@ -280,8 +280,8 @@ public class StreamsBuilder { /** * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the - * {@link StreamsConfig config} are used. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} are used. + * Key and value deserializers as defined by the options in {@link Materialized} are used. * Input {@link KeyValue records} with {@code null} key will be dropped. *

* Note that the specified input topics must be partitioned by key. @@ -317,7 +317,7 @@ public class StreamsBuilder { * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} - * regardless of the specified value in {@link StreamsConfig}. + * regardless of the specified value in {@link StreamsConfig} or {@link Consumed}. * * @param topic the topic name; cannot be {@code null} * @param consumed the instance of {@link Consumed} used to define optional parameters @@ -382,7 +382,7 @@ public class StreamsBuilder { * Long valueForKey = localStore.get(key); * } * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} - * regardless of the specified value in {@link StreamsConfig}. + * regardless of the specified value in {@link StreamsConfig} or {@link Consumed}. * * @param topic the topic name; cannot be {@code null} * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null} http://git-wip-us.apache.org/repos/asf/kafka/blob/9aad649f/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index ab299dd..8746c62 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -395,15 +395,13 @@ public class KafkaStreamsTest { @Test public void shouldReturnThreadMetadata() { - props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString()); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Set threadMetadata = streams.localThreadsMetadata(); assertNotNull(threadMetadata); assertEquals(2, threadMetadata.size()); for (ThreadMetadata metadata : threadMetadata) { - assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING or CREATED", - Utils.mkList("RUNNING", "CREATED").contains(metadata.threadState())); + assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", + Utils.mkList("RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState())); assertEquals(0, metadata.standbyTasks().size()); assertEquals(0, metadata.activeTasks().size()); }