kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
Date Wed, 10 Jan 2018 19:17:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320888#comment-16320888
] 

ASF GitHub Bot commented on KAFKA-6398:
---------------------------------------

guozhangwang closed pull request #4384: KAFKA-6398: fix KTable.filter that does not include
its parent's queryable storename
URL: https://github.com/apache/kafka/pull/4384
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8c79decbb6f..3bc6f4b3474 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -155,8 +155,10 @@ String internalStoreName() {
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
         if (storeSupplier != null) {
             builder.internalTopologyBuilder.addStateStore(storeSupplier, name);
+            return new KTableImpl<>(builder, name, processorSupplier, this.keySerde,
this.valSerde, sourceNodes, internalStoreName, true);
+        } else {
+            return new KTableImpl<>(builder, name, processorSupplier, sourceNodes,
this.queryableStoreName, false);
         }
-        return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde,
sourceNodes, internalStoreName, internalStoreName != null);
     }
 
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 39ea44f1bfd..39010022a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -456,6 +456,7 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
         }
 
         for (final String predecessor : predecessorNames) {
+            Objects.requireNonNull(predecessor, "predecessor name can't be null");
             if (predecessor.equals(name)) {
                 throw new TopologyException("Processor " + name + " cannot be a predecessor
of itself.");
             }
@@ -483,6 +484,7 @@ public final void addProcessor(final String name,
         }
 
         for (final String predecessor : predecessorNames) {
+            Objects.requireNonNull(predecessor, "predecessor name must not be null");
             if (predecessor.equals(name)) {
                 throw new TopologyException("Processor " + name + " cannot be a predecessor
of itself.");
             }
@@ -508,6 +510,7 @@ public final void addStateStore(final org.apache.kafka.streams.processor.StateSt
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
+                Objects.requireNonNull(processorName, "processor name must not be null");
                 connectProcessorAndStateStore(processorName, supplier.name());
             }
         }
@@ -524,6 +527,7 @@ public final void addStateStore(final StoreBuilder storeBuilder,
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
+                Objects.requireNonNull(processorName, "processor name must not be null");
                 connectProcessorAndStateStore(processorName, storeBuilder.name());
             }
         }
@@ -602,11 +606,12 @@ private void validateTopicNotAlreadyRegistered(final String topic) {
     public final void connectProcessorAndStateStores(final String processorName,
                                                      final String... stateStoreNames) {
         Objects.requireNonNull(processorName, "processorName can't be null");
-        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null");
+        Objects.requireNonNull(stateStoreNames, "state store list must not be null");
         if (stateStoreNames.length == 0) {
             throw new TopologyException("Must provide at least one state store name.");
         }
         for (final String stateStoreName : stateStoreNames) {
+            Objects.requireNonNull(stateStoreName, "state store name must not be null");
             connectProcessorAndStateStore(processorName, stateStoreName);
         }
     }
@@ -627,6 +632,7 @@ public final void connectProcessors(final String... processorNames) {
         }
 
         for (final String processorName : processorNames) {
+            Objects.requireNonNull(processorName, "processor name can't be null");
             if (!nodeFactories.containsKey(processorName)) {
                 throw new TopologyException("Processor " + processorName + " is not added
yet.");
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index cee01dc0662..13b5b4583f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -22,12 +22,16 @@
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockPredicate;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,6 +63,30 @@ public void testFrom() {
         builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
     }
 
+    @Test
+    public void shouldAllowJoinUnmaterializedFilteredKTable() {
+        final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes,
String>allGoodPredicate());
+        builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+    }
+
+    @Test
+    public void shouldAllowJoinUnmaterializedMapValuedKTable() {
+        final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.<String>noOpValueMapper());
+        builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedSourceKTable() {
+        final KTable<Bytes, String> table = builder.<Bytes, String>table("table-topic");
+        builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+    }
+
     @Test
     public void shouldProcessingFromSinkTopic() {
         final KStream<String, String> source = builder.stream("topic-source");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 855bcea9b27..65a6de70157 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -36,7 +36,7 @@
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -139,7 +139,7 @@ public void shouldCompactTopicsForStateChangelogs() throws Exception {
                     public Iterable<String> apply(final String value) {
                         return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                     }
-                }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+                }).groupBy(MockMapper.<String, String>selectValueMapper())
                 .count("Counts").toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
@@ -186,7 +186,7 @@ public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws
Exception
                     public Iterable<String> apply(String value) {
                         return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                     }
-                }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+                }).groupBy(MockMapper.<String, String>selectValueMapper())
                 .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream();
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index cb588490623..4c12bb93544 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -39,7 +39,7 @@
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -99,7 +99,7 @@ public void before() throws InterruptedException {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024
* 1024L);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
-        KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer,
String>SelectValueMapper();
+        KeyValueMapper<Integer, String, String> mapper = MockMapper.<Integer, String>selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         groupedStream = stream
             .groupBy(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 3500dd57db2..4527c19b471 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -52,7 +52,7 @@
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -116,7 +116,7 @@ public void before() throws InterruptedException {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
 
-        final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
+        final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         groupedStream = stream
             .groupBy(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 5f6ff449b87..32546de0ef0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -39,7 +39,7 @@
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -104,7 +104,7 @@ public void before() throws InterruptedException {
         streamTwo = builder.stream(streamTwoInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         streamFour = builder.stream(streamFourInput, Consumed.with(Serdes.Integer(), Serdes.String()));
 
-        keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
+        keyMapper = MockMapper.selectValueKeyValueMapper();
     }
 
     @After
@@ -157,7 +157,7 @@ private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException
 
     private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-        final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer,
String>NoOpKeyValueMapper());
+        final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer,
String>noOpKeyValueMapper());
 
         doJoin(map1, map2, "map-both-streams-and-join-" + testNo);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-"
+ testNo);
@@ -183,7 +183,7 @@ private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException
{
     private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception {
 
         final KStream<Integer, Integer> keySelected =
-            streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
+            streamOne.selectKey(MockMapper.<Long, Integer>selectValueMapper());
 
         final String outputTopic = "select-key-join-" + testNo;
         doJoin(keySelected, streamTwo, outputTopic);
@@ -222,7 +222,7 @@ private ExpectedOutputOnTopic joinMappedRhsStream() throws InterruptedException
     private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException
{
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
-        final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer,
String>NoOpKeyValueMapper());
+        final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer,
String>noOpKeyValueMapper());
 
 
         final String outputTopic = "left-join-" + testNo;
@@ -247,7 +247,7 @@ private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
         final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
-            kvMapper = MockKeyValueMapper.NoOpKeyValueMapper();
+            kvMapper = MockMapper.noOpKeyValueMapper();
 
         final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 18ffb876f50..4b983cf2da0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -33,7 +33,7 @@
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -163,7 +163,7 @@ public void setup() {
         StreamsBuilder builder = new StreamsBuilder();
 
         builder.stream(INPUT_TOPIC)
-               .groupBy(MockKeyValueMapper.SelectKeyKeyValueMapper())
+               .groupBy(MockMapper.selectKeyKeyValueMapper())
                .count();
 
         kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(streamsConfiguration),
time);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 51c5ce237ec..fc2eaccebc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -57,7 +57,7 @@
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -218,7 +218,7 @@ private KafkaStreams createCountStream(final String inputTopic,
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
             })
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());
+            .groupBy(MockMapper.<String, String>selectValueMapper());
 
         // Create a State Store for the all time word count
         groupedByWord
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 5ffedb872fd..f9949d3c5b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -27,7 +27,7 @@
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
@@ -300,7 +300,7 @@ public void shouldMapStateStoresToCorrectSourceTopics() {
         final KTable<String, String> table = builder.table("table-topic", "table-store");
         assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
 
-        final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String,
String>SelectValueKeyValueMapper());
+        final KStream<String, String> mapped = playEvents.map(MockMapper.<String,
String>selectValueKeyValueMapper());
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
         assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"),
builder.stateStoreNameToSourceTopics().get("count"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 05a02140c35..156acadbedd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -31,7 +31,7 @@
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
 import org.junit.After;
@@ -251,7 +251,7 @@ public void shouldMapStateStoresToCorrectSourceTopics() throws Exception
{
         final KTable<String, String> table = builder.table("table-topic", consumed,
materialized);
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 
-        final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String,
String>SelectValueKeyValueMapper());
+        final KStream<String, String> mapped = playEvents.map(MockMapper.<String,
String>selectValueKeyValueMapper());
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"),
builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 705cf62f9a6..3bbd7e2fa50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -33,7 +33,7 @@
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -62,7 +62,7 @@
     @Before
     public void before() {
         groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
-                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+                .groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
     }
 
     @Test
@@ -221,7 +221,7 @@ public void shouldReduceAndMaterializeResults() {
     @Test
     public void shouldCountAndMaterializeResults() {
         final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(),
Serdes.String()));
-        table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
+        table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(),
                       Serialized.with(Serdes.String(),
                                       Serdes.String()))
                 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
@@ -238,7 +238,7 @@ public void shouldCountAndMaterializeResults() {
     @Test
     public void shouldAggregateAndMaterializeResults() {
         final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(),
Serdes.String()));
-        table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
+        table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(),
                       Serialized.with(Serdes.String(),
                                       Serdes.String()))
                 .aggregate(MockInitializer.STRING_INIT,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 0a0232c16e3..562711d2d08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -40,7 +40,7 @@
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.junit.Before;
@@ -379,7 +379,7 @@ public void shouldNotAllowNullActionOnForEach() {
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
         testStream.join((GlobalKTable) null,
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
@@ -393,14 +393,14 @@ public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
         testStream.join(builder.globalTable("global", stringConsumed),
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
         testStream.leftJoin((GlobalKTable) null,
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
@@ -414,7 +414,7 @@ public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
         testStream.leftJoin(builder.globalTable("global", stringConsumed),
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         null);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 0ae95dd9c5a..df8d2923a04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -35,7 +35,7 @@
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -73,7 +73,7 @@ public void testAggBasic() {
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
-        KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String,
String>NoOpKeyValueMapper(),
+        KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -121,7 +121,7 @@ public void testAggCoalesced() {
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
-        KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String,
String>NoOpKeyValueMapper(),
+        KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
             MockAggregator.TOSTRING_ADDER,
@@ -235,7 +235,7 @@ public void testCount() {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
-                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerialzied)
+                .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
                 .count("count")
                 .toStream()
                 .process(proc);
@@ -250,7 +250,7 @@ public void testCountWithInternalStore() {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerialzied)
+            .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
             .count()
             .toStream()
             .process(proc);
@@ -265,7 +265,7 @@ public void testCountCoalesced() {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerialzied)
+            .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
             .count("count")
             .toStream()
             .process(proc);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 7986277a7f7..d70d8b7d59b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -28,7 +28,7 @@
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Rule;
@@ -451,7 +451,7 @@ public void testSkipNullOnMaterialization() {
                 public boolean test(String key, String value) {
                     return value.equalsIgnoreCase("accept");
                 }
-            }).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            }).groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
 
         doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
@@ -473,7 +473,7 @@ public void testQueryableSkipNullOnMaterialization() {
                 public boolean test(String key, String value) {
                     return value.equalsIgnoreCase("accept");
                 }
-            }, "anyStoreNameFilter").groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            }, "anyStoreNameFilter").groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
 
         doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 9d918e27b7c..9539b45fe74 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -35,7 +35,7 @@
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
@@ -341,11 +341,11 @@ public void testRepartition() throws NoSuchFieldException, IllegalAccessExceptio
                                                                            .withValueSerde(stringSerde)
                 );
 
-        table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+        table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER,
"mock-result1");
 
 
-        table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+        table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
 
         driver.setUp(builder, stateDir, stringSerde, stringSerde);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index e68f86bc15c..a39e545f513 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -448,6 +448,11 @@ public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores
         builder.connectProcessorAndStateStores(null, "store");
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws
Exception {
+        builder.connectProcessorAndStateStores("processor", new String[]{null});
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullInternalTopic() throws Exception {
         builder.addInternalTopic(null);
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
similarity index 76%
rename from streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
rename to streams/src/test/java/org/apache/kafka/test/MockMapper.java
index 2ad24d7592e..fec9522de82 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
@@ -18,10 +18,9 @@
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
 
-public class MockKeyValueMapper {
-
-
+public class MockMapper {
 
     private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V,
KeyValue<K, V>> {
         @Override
@@ -51,20 +50,31 @@ public K apply(K key, V value) {
         }
     }
 
-    public static <K, V> KeyValueMapper<K, V, K> SelectKeyKeyValueMapper() {
+    private static class NoOpValueMapper<V> implements ValueMapper<V, V> {
+        @Override
+        public V apply(final V value) {
+            return value;
+        }
+    }
+
+    public static <K, V> KeyValueMapper<K, V, K> selectKeyKeyValueMapper() {
         return new SelectKeyMapper<>();
     }
 
 
-    public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper()
{
+    public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> noOpKeyValueMapper()
{
         return new NoOpKeyValueMapper<>();
     }
 
-    public static <K, V> KeyValueMapper<K, V, KeyValue<V, V>> SelectValueKeyValueMapper()
{
+    public static <K, V> KeyValueMapper<K, V, KeyValue<V, V>> selectValueKeyValueMapper()
{
         return new SelectValueKeyValueMapper<>();
     }
 
-    public static <K, V> KeyValueMapper<K, V, V> SelectValueMapper() {
+    public static <K, V> KeyValueMapper<K, V, V> selectValueMapper() {
         return new SelectValueMapper<>();
     }
+
+    public static <V> ValueMapper<V, V> noOpValueMapper() {
+        return new NoOpValueMapper<>();
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockPredicate.java b/streams/src/test/java/org/apache/kafka/test/MockPredicate.java
new file mode 100644
index 00000000000..9d59bab3d9b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockPredicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Predicate;
+
+public class MockPredicate {
+
+    private static class AllGoodPredicate<K, V> implements Predicate<K, V> {
+        @Override
+        public boolean test(final K key, final V value) {
+            return true;
+        }
+    }
+
+    public static <K, V> Predicate<K, V> allGoodPredicate() {
+        return new AllGoodPredicate<>();
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Non-aggregation KTable generation operator does not construct value getter correctly
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bug
>
> For any operator that generates a KTable, its {{valueGetterSupplier}} has three code
path:
> 1. If the operator is a KTable source operator, using its materialized state store for
value getter (note that currently we always materialize on KTable source).
> 2. If the operator is an aggregation operator, then its generated KTable should always
be materialized so we just use its materialized state store.
> 3. Otherwise, we treat the value getter in a per-operator basis.
> For 3) above, what we SHOULD do is that, if the generated KTable is materialized, the
value getter would just rely on its materialized state store to get the value; otherwise we
just rely on the operator itself to define which parent's value getter to inherit and what
computational logic to apply on-the-fly to get the value. For example, for {{KTable#filter()}}
where the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get from
parent's value getter and then apply the filter on the fly; and in addition we should let
the future operators to be able to access its parent's materialized state store via {{connectProcessorAndStateStore}}.
> However, current code does not do this correctly: it 1) does not check if the result
KTable is materialized or not, but always try to use its parent's value getter, and 2) it
does not try to connect its parent's materialized store to the future operator. As a result,
these operators such as {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}}
would result in TopologyException when building. The following is an example:
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building:
StateStore null is not added yet.
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
> 	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
> 	at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
> 	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> ------------------------------------------------------------------------------------------------------------
> Note that there is another minor orthogonal issue of {{KTable#filter}} itself that it
does not include its parent's queryable store name when itself is not materialized (see {{KTable#mapValues}}
for reference).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Mime
View raw message