kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.0 updated: MINOR: fix unstable FanoutIntegrationTest
Date Tue, 23 Jan 2018 23:40:15 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 0614856  MINOR: fix unstable FanoutIntegrationTest
0614856 is described below

commit 06148566dd9fde16b43cae3e896c8f47ba04baff
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Tue Jan 23 15:40:10 2018 -0800

    MINOR: fix unstable FanoutIntegrationTest
    
    * fix broken build
    * set test temp directory for Streams state directory
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/processor/internals/InternalTopologyBuilder.java    | 4 ++++
 .../org/apache/kafka/streams/integration/FanoutIntegrationTest.java   | 2 ++
 2 files changed, 6 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index fc2f1f0..faaf96a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -848,6 +848,10 @@ public class InternalTopologyBuilder {
         return nodeGroups;
     }
 
+    public synchronized ProcessorTopology build() {
+        return build((Integer) null);
+    }
+
     public synchronized ProcessorTopology build(final Integer topicGroupId) {
         final Set<String> nodeGroup;
         if (topicGroupId != null) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index b4a9320..0c11dbb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -102,6 +103,7 @@ public class FanoutIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message