kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4652: Added tests for KStreamBuilder
Date Thu, 02 Mar 2017 21:19:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ca06862a7 -> 9e65b25e9


KAFKA-4652: Added tests for KStreamBuilder

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2597 from bbejeck/KAFKA-4652_improve_kstream_builder_test_coverage


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e65b25e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e65b25e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e65b25e

Branch: refs/heads/trunk
Commit: 9e65b25e9fca6c26cef3498ff747879d4f527700
Parents: ca06862
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Thu Mar 2 13:19:08 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 2 13:19:08 2017 -0800

----------------------------------------------------------------------
 .../streams/kstream/KStreamBuilderTest.java     | 92 ++++++++++++++++++++
 1 file changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e65b25e/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 3c66dc9..7ce0b54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
@@ -35,8 +37,10 @@ import java.util.List;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class KStreamBuilderTest {
@@ -232,4 +236,92 @@ public class KStreamBuilderTest {
         assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"),
builder.stateStoreNameToSourceTopics().get("count"));
     }
+
+    @Test
+    public void shouldAddTopicToEarliestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        
+        builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName);
+        
+        assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldAddTopicToLatestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+
+        builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicName);
+
+        assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldAddTableToEarliestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        final String storeName = "test-store";
+
+        builder.table(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName, storeName);
+
+        assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldAddTableToLatestAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        final String storeName = "test-store";
+
+        builder.table(TopologyBuilder.AutoOffsetReset.LATEST, topicName, storeName);
+
+        assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldNotAddTableToOffsetResetLists() {
+        final String topicName = "topic-1";
+        final String storeName = "test-store";
+        final Serde<String> stringSerde = Serdes.String();
+
+        builder.table(stringSerde, stringSerde, topicName, storeName);
+
+        assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
+        assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
+    }
+
+    @Test
+    public void shouldNotAddRegexTopicsToOffsetResetLists() {
+        final Pattern topicPattern = Pattern.compile("topic-\\d");
+        final String topic = "topic-5";
+
+        builder.stream(topicPattern);
+        
+        assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches());
+        assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches());
+
+    }
+
+    @Test
+    public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
+        final Pattern topicPattern = Pattern.compile("topic-\\d+");
+        final String topicTwo = "topic-500000";
+
+        builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicPattern);
+
+        assertTrue(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
+        assertFalse(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
+    }
+
+    @Test
+    public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
+        final Pattern topicPattern = Pattern.compile("topic-\\d+");
+        final String topicTwo = "topic-1000000";
+
+        builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicPattern);
+
+        assertTrue(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
+        assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
+    }
 }


Mime
View raw message