Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B20B7200B64 for ; Tue, 2 Aug 2016 13:57:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B0944160A8C; Tue, 2 Aug 2016 11:57:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0306D160A76 for ; Tue, 2 Aug 2016 13:57:19 +0200 (CEST) Received: (qmail 28499 invoked by uid 500); 2 Aug 2016 11:57:19 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 28490 invoked by uid 99); 2 Aug 2016 11:57:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2016 11:57:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13352E0007; Tue, 2 Aug 2016 11:57:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: <200ad759580c46aaad5fff1659589dbf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: HOTFIX: Start embedded kafka in KafkaStreamsTest to avoid hanging Date: Tue, 2 Aug 2016 11:57:19 +0000 (UTC) archived-at: Tue, 02 Aug 2016 11:57:20 -0000 Repository: kafka Updated Branches: refs/heads/0.10.0 f2405a73e -> ce34614a4 HOTFIX: Start embedded kafka in KafkaStreamsTest to avoid hanging The KafkaStreamsTest can occasionally hang if the test doesn't run fast enough. This is due to there being no brokers available on the broker.urls provided to the StreamsConfig. The KafkaConsumer does a poll and blocks causing the test to never complete. Author: Damian Guy Reviewers: Ismael Juma Closes #1693 from dguy/kafka-streams-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ce34614a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ce34614a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ce34614a Branch: refs/heads/0.10.0 Commit: ce34614a43fb1f43ef6b5660fb37f7a0598d177a Parents: f2405a7 Author: Damian Guy Authored: Tue Aug 2 12:41:20 2016 +0100 Committer: Ismael Juma Committed: Tue Aug 2 12:41:20 2016 +0100 ---------------------------------------------------------------------- .../apache/kafka/streams/KafkaStreamsTest.java | 21 +++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ce34614a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index af7e681..f8293b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -17,10 +17,12 @@ package org.apache.kafka.streams; +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.TestUtils; import org.junit.Assert; +import org.junit.ClassRule; import org.junit.Test; import java.io.File; @@ -31,11 +33,16 @@ import static org.junit.Assert.assertTrue; public class KafkaStreamsTest { + // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete + // quick enough + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + @Test public void testStartAndClose() throws Exception { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); @@ -58,7 +65,7 @@ public class KafkaStreamsTest { public void testCloseIsIdempotent() throws Exception { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); final KStreamBuilder builder = new KStreamBuilder(); @@ -75,7 +82,7 @@ public class KafkaStreamsTest { public void testCannotStartOnceClosed() throws Exception { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); @@ -95,7 +102,7 @@ public class KafkaStreamsTest { public void testCannotStartTwice() throws Exception { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); @@ -115,7 +122,7 @@ public class KafkaStreamsTest { public void testCleanup() throws Exception { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); @@ -137,7 +144,7 @@ public class KafkaStreamsTest { final File stateDirApp2 = new File(stateDir + File.separator + appId2); final Properties props = new Properties(); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir); assertFalse(stateDirApp1.exists()); @@ -164,7 +171,7 @@ public class KafkaStreamsTest { public void testCannotCleanupWhileRunning() throws Exception { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props);