kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6398: Return value getter based on KTable materialization status
Date Thu, 18 Jan 2018 21:12:53 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 75e37d7  KAFKA-6398: Return value getter based on KTable materialization status
75e37d7 is described below

commit 75e37d7e2063b726aa0185f4b695f32cde69d95c
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jan 18 13:12:44 2018 -0800

    KAFKA-6398: Return value getter based on KTable materialization status
    
    This is a bug fix that is composed of two parts:
    
    1. The major part is, for all operators that is generating a KTable, we should construct
its value getter based on whether the KTable itself is materialized.
    1.a If yes, then query the materialized store directly for value getter.
    1.b If not, then hand over to its parents value getter (recursively) and apply the computation
to return.
    
    2. The minor part is, in KStreamImpl, when joining with a table, we should connect with
table's `valueGetterSupplier().storeNames()`, not the `internalStoreName()` as the latter
always assume that the KTable is materialized, but that is not always true.
    
    Author: Guozhang Wang <wangguoz@gmail.com>
    
    Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
    
    Closes #4421 from guozhangwang/K6398-KTableValueGetter
---
 .../streams/kstream/internals/KStreamImpl.java     |  2 +-
 .../streams/kstream/internals/KTableAggregate.java | 37 ++-------
 .../streams/kstream/internals/KTableFilter.java    | 55 +++++++-------
 .../streams/kstream/internals/KTableImpl.java      |  8 +-
 .../internals/KTableKTableAbstractJoin.java        | 13 ++--
 ...ableKTableAbstractJoinValueGetterSupplier.java} | 10 +--
 .../kstream/internals/KTableKTableJoin.java        |  6 +-
 .../kstream/internals/KTableKTableJoinMerger.java  | 47 +++++++++---
 .../kstream/internals/KTableKTableLeftJoin.java    |  6 +-
 .../kstream/internals/KTableKTableOuterJoin.java   |  6 +-
 .../kstream/internals/KTableKTableRightJoin.java   |  6 +-
 .../streams/kstream/internals/KTableMapValues.java | 46 ++++++------
 .../KTableMaterializedValueGetterSupplier.java     | 54 ++++++++++++++
 .../streams/kstream/internals/KTableReduce.java    | 34 +--------
 .../kstream/internals/KTableRepartitionMap.java    |  8 +-
 .../processor/internals/ProcessorTopology.java     | 10 +++
 .../apache/kafka/streams/StreamsBuilderTest.java   | 87 +++++++++++++++++++++-
 .../kstream/internals/KTableFilterTest.java        |  3 +
 .../kstream/internals/KTableKTableJoinTest.java    |  2 -
 19 files changed, 281 insertions(+), 159 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index db4d238..8aaab49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -794,7 +794,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
 
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K,
?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
-        builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl<K,
?, V1>) other).internalStoreName());
+        builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl)
other).valueGetterSupplier().storeNames());
         builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?,
V1>) other).name);
 
         return new KStreamImpl<>(builder, name, allSourceNodes, false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 0fe3e1f..dd34735 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -33,7 +33,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K,
V, T
 
     private boolean sendOldValues = false;
 
-    public KTableAggregate(String storeName, Initializer<T> initializer, Aggregator<?
super K, ? super V, T> add, Aggregator<? super K, ? super V, T> remove) {
+    KTableAggregate(final String storeName,
+                    final Initializer<T> initializer,
+                    final Aggregator<? super K, ? super V, T> add,
+                    final Aggregator<? super K, ? super V, T> remove) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.add = add;
@@ -51,7 +54,6 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K,
V, T
     }
 
     private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>>
{
-
         private KeyValueStore<K, T> store;
         private TupleForwarder<K, T> tupleForwarder;
 
@@ -98,35 +100,6 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K,
V, T
 
     @Override
     public KTableValueGetterSupplier<K, T> view() {
-
-        return new KTableValueGetterSupplier<K, T>() {
-
-            public KTableValueGetter<K, T> get() {
-                return new KTableAggregateValueGetter();
-            }
-
-            @Override
-            public String[] storeNames() {
-                return new String[]{storeName};
-            }
-        };
-    }
-
-    private class KTableAggregateValueGetter implements KTableValueGetter<K, T> {
-
-        private KeyValueStore<K, T> store;
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-        }
-
-        @Override
-        public T get(K key) {
-            return store.get(key);
-        }
-
+        return new KTableMaterializedValueGetterSupplier<>(storeName);
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index af8c906..ee8982b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -30,8 +30,10 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K,
V, V> {
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableFilter(final KTableImpl<K, ?, V> parent, final Predicate<? super
K, ? super V> predicate,
-                        final boolean filterNot, final String queryableName) {
+    KTableFilter(final KTableImpl<K, ?, V> parent,
+                 final Predicate<? super K, ? super V> predicate,
+                 final boolean filterNot,
+                 final String queryableName) {
         this.parent = parent;
         this.predicate = predicate;
         this.filterNot = filterNot;
@@ -44,24 +46,6 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K,
V, V> {
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-
-        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
-
-        return new KTableValueGetterSupplier<K, V>() {
-
-            public KTableValueGetter<K, V> get() {
-                return new KTableFilterValueGetter(parentValueGetterSupplier.get());
-            }
-
-            @Override
-            public String[] storeNames() {
-                return parentValueGetterSupplier.storeNames();
-            }
-        };
-    }
-
-    @Override
     public void enableSendingOldValues() {
         parent.enableSendingOldValues();
         sendOldValues = true;
@@ -108,24 +92,45 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
     }
 
-    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        // if the KTable is materialized, use the materialized store to return getter value;
+        // otherwise rely on the parent getter and apply filter on-the-fly
+        if (queryableName != null) {
+            return new KTableMaterializedValueGetterSupplier<>(queryableName);
+        } else {
+            return new KTableValueGetterSupplier<K, V>() {
+                final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+                public KTableValueGetter<K, V> get() {
+                    return new KTableFilterValueGetter(parentValueGetterSupplier.get());
+                }
+
+                @Override
+                public String[] storeNames() {
+                    return parentValueGetterSupplier.storeNames();
+                }
+            };
+        }
+    }
 
+    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
         private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableFilterValueGetter(KTableValueGetter<K, V> parentGetter) {
+        KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
         }
 
         @Override
-        public V get(K key) {
+        public V get(final K key) {
             return computeValue(key, parentGetter.get(key));
         }
-
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 4019039..0cf56ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -138,10 +138,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         return this.queryableStoreName;
     }
 
-    String internalStoreName() {
-        return this.queryableStoreName;
-    }
-
     @SuppressWarnings("deprecation")
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
                                   final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore>
storeSupplier,
@@ -771,9 +767,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         }
 
         final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes,
this.internalStoreName(), false),
+                new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes,
this.queryableStoreName, false),
                 new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, ((KTableImpl<K,
?, ?>) other).sourceNodes,
-                        ((KTableImpl<K, ?, ?>) other).internalStoreName(), false),
+                        ((KTableImpl<K, ?, ?>) other).queryableStoreName, false),
                 internalQueryableName
         );
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index 7fa39d9..bdc1dca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -20,13 +20,13 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 
 abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessorSupplier<K,
V1, R> {
 
-    protected final KTableImpl<K, ?, V1> table1;
-    protected final KTableImpl<K, ?, V2> table2;
-    protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    protected final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
+    private final KTableImpl<K, ?, V1> table1;
+    private final KTableImpl<K, ?, V2> table2;
+    final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
 
-    protected boolean sendOldValues = false;
+    boolean sendOldValues = false;
 
     KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1,
                              KTableImpl<K, ?, V2> table2,
@@ -44,5 +44,4 @@ abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessor
         table2.enableSendingOldValues();
         sendOldValues = true;
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
similarity index 78%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
index 74c0632..be5a202 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
@@ -18,12 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
 
 import java.util.ArrayList;
 
-public abstract class AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> implements
KTableValueGetterSupplier<K, R> {
-    final protected KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    final protected KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> implements
KTableValueGetterSupplier<K, R> {
+    final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
 
-    public AbstractKTableKTableJoinValueGetterSupplier(final KTableValueGetterSupplier<K,
V1> valueGetterSupplier1,
-                                                       final KTableValueGetterSupplier<K,
V2> valueGetterSupplier2) {
+    KTableKTableAbstractJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1>
valueGetterSupplier1,
+                                                final KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
         this.valueGetterSupplier1 = valueGetterSupplier1;
         this.valueGetterSupplier2 = valueGetterSupplier2;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 1b26a5b..c424f4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -42,12 +42,12 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+        return new KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
     }
 
-    private class KTableKTableJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableAbstractJoinValueGetterSupplier extends org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1>
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 82d9c26..d27b8bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -28,9 +28,9 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
-                                  final KTableImpl<K, ?, V> parent2,
-                                  final String queryableName) {
+    KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
+                           final KTableImpl<K, ?, V> parent2,
+                           final String queryableName) {
         this.parent1 = parent1;
         this.parent2 = parent2;
         this.queryableName = queryableName;
@@ -38,12 +38,42 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
     @Override
     public Processor<K, Change<V>> get() {
-        return new KTableKTableJoinMergeProcessor<>();
+        return new KTableKTableJoinMergeProcessor();
     }
 
     @Override
     public KTableValueGetterSupplier<K, V> view() {
-        return parent1.valueGetterSupplier();
+        // if the result KTable is materialized, use the materialized store to return getter
value;
+        // otherwise rely on the parent getter and apply join on-the-fly
+        if (queryableName != null) {
+            return new KTableMaterializedValueGetterSupplier<>(queryableName);
+        } else {
+            return new KTableValueGetterSupplier<K, V>() {
+
+                public KTableValueGetter<K, V> get() {
+                    return parent1.valueGetterSupplier().get();
+                }
+
+                @Override
+                public String[] storeNames() {
+                    // we need to allow the downstream processor to be able to access both
ends of the joining table's value getters
+                    final String[] storeNames1 = parent1.valueGetterSupplier().storeNames();
+                    final String[] storeNames2 = parent2.valueGetterSupplier().storeNames();
+
+                    final String[] stores = new String[storeNames1.length + storeNames2.length];
+                    int i = 0;
+                    for (final String storeName : storeNames1) {
+                        stores[i] = storeName;
+                        i++;
+                    }
+                    for (final String storeName : storeNames2) {
+                        stores[i] = storeName;
+                        i++;
+                    }
+                    return stores;
+                }
+            };
+        }
     }
 
     @Override
@@ -53,14 +83,13 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
         sendOldValues = true;
     }
 
-    private class KTableKTableJoinMergeProcessor<K, V>
-        extends AbstractProcessor<K, Change<V>> {
+    private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>>
{
         private KeyValueStore<K, V> store;
         private TupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
@@ -72,7 +101,6 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
         @Override
         public void process(K key, Change<V> value) {
-
             if (queryableName != null) {
                 store.put(key, value.newValue);
                 tupleForwarder.maybeForward(key, value.newValue, value.oldValue);
@@ -81,5 +109,4 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
             }
         }
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index c308a0d..33aef02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -34,12 +34,12 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+        return new KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
     }
 
-    private class KTableKTableLeftJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableLeftAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1>
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 9cee4f3..d2e1d79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -34,12 +34,12 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+        return new KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
     }
 
-    private class KTableKTableOuterJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableOuterAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1>
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index b43efaa..f4c840b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -35,12 +35,12 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+        return new KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
     }
 
-    private class KTableKTableRightJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableRightAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1>
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 03fa3a3..0970b42 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -30,8 +30,9 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K,
V, V1> {
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapperWithKey<?
super K, ? super V, ? extends V1> mapper,
-                           final String queryableName) {
+    KTableMapValues(final KTableImpl<K, ?, V> parent,
+                    final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper,
+                    final String queryableName) {
         this.parent = parent;
         this.mapper = mapper;
         this.queryableName = queryableName;
@@ -44,19 +45,24 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K,
V, V1> {
 
     @Override
     public KTableValueGetterSupplier<K, V1> view() {
-        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
-
-        return new KTableValueGetterSupplier<K, V1>() {
-
-            public KTableValueGetter<K, V1> get() {
-                return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
-            }
-
-            @Override
-            public String[] storeNames() {
-                return parentValueGetterSupplier.storeNames();
-            }
-        };
+        // if the KTable is materialized, use the materialized store to return getter value;
+        // otherwise rely on the parent getter and apply map-values on-the-fly
+        if (queryableName != null) {
+            return new KTableMaterializedValueGetterSupplier<>(queryableName);
+        } else {
+            return new KTableValueGetterSupplier<K, V1>() {
+                final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+                public KTableValueGetter<K, V1> get() {
+                    return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
+                }
+
+                @Override
+                public String[] storeNames() {
+                    return parentValueGetterSupplier.storeNames();
+                }
+            };
+        }
     }
 
     @Override
@@ -75,13 +81,12 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K,
V, V1> {
     }
 
     private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>>
{
-
         private KeyValueStore<K, V1> store;
         private TupleForwarder<K, V1> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
@@ -107,20 +112,19 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K,
V, V1> {
 
         private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) {
+        KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
         }
 
         @Override
-        public V1 get(K key) {
+        public V1 get(final K key) {
             return computeValue(key, parentGetter.get(key));
         }
-
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
new file mode 100644
index 0000000..4ceccce
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K,
V> {
+
+    private final String storeName;
+
+    KTableMaterializedValueGetterSupplier(final String storeName) {
+        this.storeName = storeName;
+    }
+
+    public KTableValueGetter<K, V> get() {
+        return new KTableMaterializedValueGetter();
+    }
+
+    @Override
+    public String[] storeNames() {
+        return new String[]{storeName};
+    }
+
+    private class KTableMaterializedValueGetter implements KTableValueGetter<K, V>
{
+        private KeyValueStore<K, V> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public V get(final K key) {
+            return store.get(key);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 62484a4..11b7d55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -31,7 +31,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
     private boolean sendOldValues = false;
 
-    public KTableReduce(String storeName, Reducer<V> addReducer, Reducer<V> removeReducer)
{
+    KTableReduce(final String storeName, final Reducer<V> addReducer, final Reducer<V>
removeReducer) {
         this.storeName = storeName;
         this.addReducer = addReducer;
         this.removeReducer = removeReducer;
@@ -54,7 +54,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<K, V>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
@@ -94,34 +94,6 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
     @Override
     public KTableValueGetterSupplier<K, V> view() {
-
-        return new KTableValueGetterSupplier<K, V>() {
-
-            public KTableValueGetter<K, V> get() {
-                return new KTableAggregateValueGetter();
-            }
-
-            @Override
-            public String[] storeNames() {
-                return new String[]{storeName};
-            }
-        };
-    }
-
-    private class KTableAggregateValueGetter implements KTableValueGetter<K, V> {
-
-        private KeyValueStore<K, V> store;
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-        }
-
-        @Override
-        public V get(K key) {
-            return store.get(key);
-        }
-
+        return new KTableMaterializedValueGetterSupplier<K, V>(storeName);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 09714a7..5aa3f2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -33,7 +33,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
     private final KTableImpl<K, ?, V> parent;
     private final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper;
 
-    public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<? super
K, ? super V, KeyValue<K1, V1>> mapper) {
+    KTableRepartitionMap(final KTableImpl<K, ?, V> parent, final KeyValueMapper<?
super K, ? super V, KeyValue<K1, V1>> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
@@ -101,17 +101,17 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
 
         private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValueGetter(KTableValueGetter<K, V> parentGetter) {
+        KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
         }
 
         @Override
-        public KeyValue<K1, V1> get(K key) {
+        public KeyValue<K1, V1> get(final K key) {
             return mapper.apply(key, parentGetter.get(key));
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 4291e34..eba46c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -197,4 +197,14 @@ public class ProcessorTopology {
         return sb.toString();
     }
 
+    // for testing only
+    public Set<String> processorConnectedStateStores(final String processorName) {
+        for (final ProcessorNode<?, ?> node : processorNodes) {
+            if (node.name().equals(processorName)) {
+                return node.stateStores;
+            }
+        }
+
+        return Collections.emptySet();
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 13b5b45..4a496b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockMapper;
@@ -45,6 +46,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,6 +71,27 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
 
         driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(1));
+        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty(),
is(true));
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedFilteredKTable() {
+        final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic")
+                .filter(MockPredicate.<Bytes, String>allGoodPredicate(), Materialized.<Bytes,
String, KeyValueStore<Bytes, byte[]>>as("store"));
+        builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(2));
+        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
+        assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), equalTo(Collections.singleton("store")));
     }
 
     @Test
@@ -77,14 +100,72 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
 
         driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(1));
+        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty(),
is(true));
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedMapValuedKTable() {
+        final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic")
+                .mapValues(MockMapper.<String>noOpValueMapper(), Materialized.<Bytes,
String, KeyValueStore<Bytes, byte[]>>as("store"));
+        builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(2));
+        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
+        assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"),
equalTo(Collections.singleton("store")));
+    }
+
+    @Test
+    public void shouldAllowJoinUnmaterializedJoinedKTable() {
+        final KTable<Bytes, String> table1 = builder.table("table-topic1");
+        final KTable<Bytes, String> table2 = builder.table("table-topic2");
+        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER),
MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(2));
+        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Utils.mkSet(topology.stateStores().get(0).name(),
topology.stateStores().get(1).name())));
+        assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(),
is(true));
+
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedJoinedKTable() {
+        final KTable<Bytes, String> table1 = builder.table("table-topic1");
+        final KTable<Bytes, String> table2 = builder.table("table-topic2");
+        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER,
Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store")), MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(3));
+        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Collections.singleton("store")));
+        assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), equalTo(Collections.singleton("store")));
     }
 
     @Test
     public void shouldAllowJoinMaterializedSourceKTable() {
-        final KTable<Bytes, String> table = builder.<Bytes, String>table("table-topic");
+        final KTable<Bytes, String> table = builder.table("table-topic");
         builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
 
         driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(1));
+        assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
     }
 
     @Test
@@ -168,7 +249,7 @@ public class StreamsBuilderTest {
         driver.process(topic, 1L, "value1");
         driver.process(topic, 2L, "value2");
         driver.flushState();
-        final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store");
+        final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>)
driver.allStateStores().get("store");
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
         assertThat(results.get(1L), equalTo("value1"));
@@ -186,7 +267,7 @@ public class StreamsBuilderTest {
         driver.process(topic, 1L, "value1");
         driver.process(topic, 2L, "value2");
         driver.flushState();
-        final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store");
+        final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>)
driver.allStateStores().get("store");
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index d70d8b7..01236ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -177,6 +177,7 @@ public class KTableFilterTest {
 
         driver.process(topic1, "A", 2);
         driver.process(topic1, "B", 2);
+        driver.flushState();
 
         assertEquals(2, (int) getter2.get("A"));
         assertEquals(2, (int) getter2.get("B"));
@@ -187,6 +188,7 @@ public class KTableFilterTest {
         assertEquals(1, (int) getter3.get("C"));
 
         driver.process(topic1, "A", 3);
+        driver.flushState();
 
         assertNull(getter2.get("A"));
         assertEquals(2, (int) getter2.get("B"));
@@ -198,6 +200,7 @@ public class KTableFilterTest {
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
+        driver.flushState();
 
         assertNull(getter2.get("A"));
         assertNull(getter2.get("B"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index aeb2418..09d4aa0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -46,8 +46,6 @@ public class KTableKTableJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
-    final private String storeName1 = "store-name-1";
-    final private String storeName2 = "store-name-2";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message