kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: HOTFIX: Start embedded kafka in KafkaStreamsTest to avoid hanging
Date Tue, 02 Aug 2016 12:03:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f7976d2fc -> 3bb38d37b


HOTFIX: Start embedded kafka in KafkaStreamsTest to avoid hanging

The KafkaStreamsTest can occasionally hang if the test doesn't run fast enough. This is due
to there being no brokers available on the broker.urls provided to the StreamsConfig. The
KafkaConsumer does a poll and blocks causing the test to never complete.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1693 from dguy/kafka-streams-test

(cherry picked from commit ce34614a43fb1f43ef6b5660fb37f7a0598d177a)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bb38d37
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bb38d37
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bb38d37

Branch: refs/heads/trunk
Commit: 3bb38d37b7a3fe2dc794c717df1d67e8f9a8af21
Parents: f7976d2
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Aug 2 12:41:20 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Aug 2 12:57:26 2016 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreamsTest.java  | 21 +++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3bb38d37/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index f33cb3a..29b8b03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.io.File;
@@ -31,11 +33,16 @@ import static org.junit.Assert.assertTrue;
 
 public class KafkaStreamsTest {
 
+    // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test
doesn't complete
+    // quick enough
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+
     @Test
     public void testStartAndClose() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
@@ -58,7 +65,7 @@ public class KafkaStreamsTest {
     public void testCloseIsIdempotent() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
         final KStreamBuilder builder = new KStreamBuilder();
@@ -75,7 +82,7 @@ public class KafkaStreamsTest {
     public void testCannotStartOnceClosed() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -95,7 +102,7 @@ public class KafkaStreamsTest {
     public void testCannotStartTwice() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -115,7 +122,7 @@ public class KafkaStreamsTest {
     public void testCleanup() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -137,7 +144,7 @@ public class KafkaStreamsTest {
         final File stateDirApp2 = new File(stateDir + File.separator + appId2);
 
         final Properties props = new Properties();
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
 
         assertFalse(stateDirApp1.exists());
@@ -164,7 +171,7 @@ public class KafkaStreamsTest {
     public void testCannotCleanupWhileRunning() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);


Mime
View raw message