kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [5/5] kafka git commit: KAFKA-4490: Add Global Table support to Kafka Streams
Date Thu, 12 Jan 2017 19:46:09 GMT
KAFKA-4490: Add Global Table support to Kafka Streams

Add Global Tables to KafkaStreams. Global Tables are fully replicated once-per instance of KafkaStreams. A single thread is used to update them. They can be used to join with KStreams, KTables, and other GlobalKTables. When participating in a join a GlobalKTable is only ever used to perform a lookup, i.e., it will never cause data to be forwarded to downstream processor nodes.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang

Closes #2244 from dguy/global-tables


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8079c980
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8079c980
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8079c980

Branch: refs/heads/trunk
Commit: 8079c980ac5c9c00d7577fa05857b4de6864dbb5
Parents: c063172
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Jan 12 11:46:02 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jan 12 11:46:02 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  92 ++++-
 .../kafka/streams/kstream/GlobalKTable.java     |  59 +++
 .../apache/kafka/streams/kstream/KStream.java   |  68 ++-
 .../kafka/streams/kstream/KStreamBuilder.java   |  54 +++
 .../apache/kafka/streams/kstream/KTable.java    |   1 +
 .../kstream/internals/GlobalKTableImpl.java     |  34 ++
 .../internals/KStreamGlobalKTableJoin.java      |  46 +++
 .../streams/kstream/internals/KStreamImpl.java  |  39 +-
 .../kstream/internals/KStreamKTableJoin.java    |  47 +--
 .../internals/KStreamKTableJoinProcessor.java   |  62 +++
 .../streams/kstream/internals/KTableImpl.java   |   4 +-
 .../kstream/internals/KTableKTableJoin.java     |  46 +--
 .../internals/KTableKTableJoinValueGetter.java  |  62 +++
 .../KTableKTableLeftJoinValueGetter.java        |  58 +++
 .../KTableSourceValueGetterSupplier.java        |   6 +-
 .../streams/processor/TopologyBuilder.java      | 185 +++++++--
 .../internals/AbstractProcessorContext.java     | 195 +++++++++
 .../processor/internals/AbstractTask.java       |   1 +
 .../internals/GlobalProcessorContextImpl.java   |  81 ++++
 .../internals/GlobalStateMaintainer.java        |  37 ++
 .../processor/internals/GlobalStateManager.java |  23 ++
 .../internals/GlobalStateManagerImpl.java       | 246 +++++++++++
 .../internals/GlobalStateUpdateTask.java        | 110 +++++
 .../processor/internals/GlobalStreamThread.java | 199 +++++++++
 .../internals/InternalProcessorContext.java     |   6 +
 .../processor/internals/PartitionGroup.java     |   2 +-
 .../internals/ProcessorContextImpl.java         | 222 ++--------
 .../processor/internals/ProcessorNode.java      |   2 +-
 .../internals/ProcessorStateManager.java        |  16 +-
 .../processor/internals/ProcessorTopology.java  |  22 +-
 .../processor/internals/RecordDeserializer.java |  23 ++
 .../processor/internals/RecordQueue.java        |  43 +-
 .../internals/SourceNodeRecordDeserializer.java |  57 +++
 .../processor/internals/StandbyContextImpl.java | 100 +----
 .../processor/internals/StateDirectory.java     |  65 ++-
 .../processor/internals/StateManager.java       |  41 ++
 .../streams/processor/internals/StreamTask.java |  14 +-
 .../processor/internals/StreamThread.java       |  10 +-
 .../internals/StreamsMetadataState.java         |  37 +-
 .../state/internals/CachingKeyValueStore.java   |   3 +
 .../DelegatingPeekingKeyValueIterator.java      |   4 +-
 .../internals/GlobalStateStoreProvider.java     |  46 +++
 .../streams/state/internals/NamedCache.java     |   4 +
 .../state/internals/OffsetCheckpoint.java       |   5 +-
 .../state/internals/QueryableStoreProvider.java |  17 +-
 .../streams/state/internals/ThreadCache.java    |   4 +
 .../apache/kafka/streams/KafkaStreamsTest.java  |  57 +--
 .../GlobalKTableIntegrationTest.java            | 280 +++++++++++++
 .../integration/RegexSourceIntegrationTest.java |   3 +-
 .../streams/kstream/KStreamBuilderTest.java     |  83 ++++
 .../internals/GlobalKTableJoinsTest.java        | 110 +++++
 .../kstream/internals/KStreamImplTest.java      |  44 ++
 .../kstream/internals/KTableKTableJoinTest.java |   1 +
 .../streams/processor/TopologyBuilderTest.java  |   4 +-
 .../processor/internals/AbstractTaskTest.java   |   4 +-
 .../internals/GlobalStateManagerImplTest.java   | 413 +++++++++++++++++++
 .../internals/GlobalStateTaskTest.java          | 144 +++++++
 .../internals/GlobalStreamThreadTest.java       | 116 ++++++
 .../processor/internals/PartitionGroupTest.java |   4 +-
 .../internals/ProcessorTopologyTest.java        |  21 +
 .../processor/internals/RecordQueueTest.java    |  26 +-
 .../SourceNodeRecordDeserializerTest.java       | 109 +++++
 .../processor/internals/StandbyTaskTest.java    |   6 +-
 .../processor/internals/StateConsumerTest.java  | 165 ++++++++
 .../processor/internals/StateDirectoryTest.java |  22 +
 .../internals/StreamPartitionAssignorTest.java  |  52 ++-
 .../processor/internals/StreamTaskTest.java     |  22 +-
 .../processor/internals/StreamThreadTest.java   |  40 +-
 .../internals/StreamsMetadataStateTest.java     |  75 +++-
 .../internals/CachingKeyValueStoreTest.java     |   6 +
 .../CompositeReadOnlyKeyValueStoreTest.java     |   5 +-
 .../CompositeReadOnlySessionStoreTest.java      |   2 +-
 .../CompositeReadOnlyWindowStoreTest.java       |  23 +-
 .../DelegatingPeekingKeyValueIteratorTest.java  |   1 +
 .../internals/GlobalStateStoreProviderTest.java |  61 +++
 .../state/internals/InMemoryKeyValueStore.java  | 151 -------
 ...gedSortedCacheKeyValueStoreIteratorTest.java |   1 +
 .../streams/state/internals/NamedCacheTest.java |   4 +
 .../internals/QueryableStoreProviderTest.java   |  15 +-
 .../state/internals/StateStoreTestUtils.java    |  57 ---
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 .../state/internals/ThreadCacheTest.java        |   7 +
 .../kafka/test/GlobalStateManagerStub.java      |  83 ++++
 .../kafka/test/InMemoryKeyValueStore.java       | 152 +++++++
 .../apache/kafka/test/KStreamTestDriver.java    |  28 +-
 .../kafka/test/KTableValueGetterStub.java       |  46 +++
 .../apache/kafka/test/MockKeyValueMapper.java   |  14 +
 .../apache/kafka/test/MockProcessorContext.java |   5 +
 .../apache/kafka/test/MockProcessorNode.java    |   8 +
 .../org/apache/kafka/test/MockSourceNode.java   |   8 +
 .../apache/kafka/test/NoOpProcessorContext.java |  84 ++++
 .../apache/kafka/test/NoOpReadOnlyStore.java    |  91 ++++
 .../kafka/test/ProcessorTopologyTestDriver.java | 142 +++++--
 .../org/apache/kafka/test/StreamsTestUtils.java |  13 +
 94 files changed, 4468 insertions(+), 841 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index fc47672..31456b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -31,11 +32,15 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.StreamsMetadata;
+import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
@@ -56,6 +61,9 @@ import java.util.HashSet;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+
 /**
  * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
  * sends output to zero or more output topics.
@@ -99,7 +107,9 @@ public class KafkaStreams {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
     private static final String JMX_PREFIX = "kafka.streams";
-    public static final int DEFAULT_CLOSE_TIMEOUT = 0;
+    private static final int DEFAULT_CLOSE_TIMEOUT = 0;
+    private GlobalStreamThread globalStreamThread;
+
     private final StreamThread[] threads;
     private final Map<Long, StreamThread.State> threadState;
     private final Metrics metrics;
@@ -293,24 +303,62 @@ public class KafkaStreams {
         threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         threadState = new HashMap<>(threads.length);
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
-        streamsMetadataState = new StreamsMetadataState(builder);
+        streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+
+        final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
+
+        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
+            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
+        }
+
+        final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
+                (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
+
+
+        if (globalTaskTopology != null) {
+            globalStreamThread = new GlobalStreamThread(globalTaskTopology,
+                                                        config,
+                                                        clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
+                                                        new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG)),
+                                                        metrics,
+                                                        time,
+                                                        clientId);
+        }
+
         for (int i = 0; i < threads.length; i++) {
             threads[i] = new StreamThread(builder,
-                config,
-                clientSupplier,
-                applicationId,
-                clientId,
-                processId,
-                metrics,
-                time,
-                streamsMetadataState);
+                                          config,
+                                          clientSupplier,
+                                          applicationId,
+                                          clientId,
+                                          processId,
+                                          metrics,
+                                          time,
+                                          streamsMetadataState,
+                                          cacheSizeBytes);
             threads[i].setStateListener(streamStateListener);
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
-        queryableStoreProvider = new QueryableStoreProvider(storeProviders);
+        final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
+        queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
     }
 
+    private static HostInfo parseHostInfo(final String endPoint) {
+        if (endPoint == null || endPoint.trim().isEmpty()) {
+            return StreamsMetadataState.UNKNOWN_HOST;
+        }
+        final String host = getHost(endPoint);
+        final Integer port = getPort(endPoint);
+
+        if (host == null || port == null) {
+            throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
+        }
+
+        return new HostInfo(host, port);
+    }
+
+
     /**
      * Start the stream instance by starting all its threads.
      *
@@ -322,6 +370,10 @@ public class KafkaStreams {
         if (state == KafkaStreams.State.CREATED) {
             setState(KafkaStreams.State.RUNNING);
 
+            if (globalStreamThread != null) {
+                globalStreamThread.start();
+            }
+
             for (final StreamThread thread : threads) {
                 thread.start();
             }
@@ -368,7 +420,16 @@ public class KafkaStreams {
                             thread.setStateListener(null);
                             thread.close();
                         }
-
+                        if (globalStreamThread != null) {
+                            globalStreamThread.close();
+                            if (!globalStreamThread.stillRunning()) {
+                                try {
+                                    globalStreamThread.join();
+                                } catch (InterruptedException e) {
+                                    Thread.interrupted();
+                                }
+                            }
+                        }
                         for (final StreamThread thread : threads) {
                             try {
                                 if (!thread.stillRunning()) {
@@ -453,8 +514,13 @@ public class KafkaStreams {
      * @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
      */
     public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
-        for (final StreamThread thread : threads)
+        for (final StreamThread thread : threads) {
             thread.setUncaughtExceptionHandler(eh);
+        }
+
+        if (globalStreamThread != null) {
+            globalStreamThread.setUncaughtExceptionHandler(eh);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
new file mode 100644
index 0000000..6616c90
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.streams.KafkaStreams;
+
+/**
+ * {@link GlobalKTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
+ * Each record in this stream is an update on the primary-keyed table with the record key as the primary key.
+ * <p>
+ * A {@link GlobalKTable} is fully replicated per {@link KafkaStreams} instance. Every partition of the underlying topic
+ * is consumed by each {@link GlobalKTable}, such that the full set of data is available in every {@link KafkaStreams} instance.
+ * This provides the ability to perform joins with {@link KStream}, and {@link KTable},
+ * without having to repartition the input streams. All joins with the {@link GlobalKTable} require that a {@link KeyValueMapper}
+ * is provided that can map from the (key, value) of the left hand side to the key of the right hand side {@link GlobalKTable}
+ * <p>
+ * A {@link GlobalKTable} is created via a {@link KStreamBuilder}. For example:
+ * <pre>
+ *     builder.globalTable("topic-name", "queryable-store-name");
+ * </pre>
+ * all {@link GlobalKTable}s are backed by a {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore ReadOnlyKeyValueStore}
+ * and are therefore queryable via the interactive queries API.
+ * For example:
+ * <pre>{@code
+ *     final GlobalKTable globalOne = builder.globalTable("g1", "g1-store");
+ *     final GlobalKTable globalTwo = builder.globalTable("g2", "g2-store");
+ *     ...
+ *     final KafkaStreams streams = ...;
+ *     streams.start()
+ *     ...
+ *     ReadOnlyKeyValueStore view = streams.store("g1-store", QueryableStoreTypes.keyValueStore());
+ *     view.get(key);
+ *}</pre>
+ *
+ *
+ * @param <K> Type of primary keys
+ * @param <V> Type of value changes
+ *
+ * @see KTable
+ */
+@InterfaceStability.Unstable
+public interface GlobalKTable<K, V> {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 245925a..1645ee0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -1511,7 +1511,7 @@ public interface KStream<K, V> {
      * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
      * will not produce any result records.
      * <p>
-     * For each {@link KStream} record weather on not it finds an corresponding record in {@link KTable} the provided
+     * For each {@link KStream} record weather or not it finds an corresponding record in {@link KTable} the provided
      * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
      * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}.
      * The key of the result record is the same as for both joining input records.
@@ -1581,7 +1581,7 @@ public interface KStream<K, V> {
      * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
      * will not produce any result records.
      * <p>
-     * For each {@link KStream} record weather on not it finds an corresponding record in {@link KTable} the provided
+     * For each {@link KStream} record weather or not it finds an corresponding record in {@link KTable} the provided
      * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
      * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}.
      * The key of the result record is the same as for both joining input records.
@@ -1646,4 +1646,68 @@ public interface KStream<K, V> {
                                      final Serde<K> keySerde,
                                      final Serde<V> valSerde);
 
+
+    /**
+     * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
+     * In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)} inner-join}, all records from this stream will produce an
+     * output record (cf. below).
+     * The join is a primary key table lookup join with join attribute {@code keyValueMapper.map(stream.keyValue) == table.key}.
+     * "Table lookup join" means, that results are only computed if {@link KStream} records are processed.
+     * This is done by performing a lookup for matching records in the <em>current</em> internal
+     * {@link GlobalKTable} state.
+     * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable} state and
+     * will not produce any result records.
+     * <p>
+     * For each {@link KStream} record whether or not it finds an corresponding record in {@link GlobalKTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}.
+     * The key of the result record is the same as this {@link KStream}
+     * If an {@link KStream} input record key or value is {@code null} the record will not be included in the join
+     * operation and thus no output record will be added to the resulting {@link KStream}.
+     **
+     * @param globalKTable      the {@link GlobalKTable} to be joined with this stream
+     * @param keyValueMapper    instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
+     *                          to the key of the {@link GlobalKTable}
+     * @param valueJoiner       a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <GK>              the key type of {@link GlobalKTable}
+     * @param <GV>              the value type of the {@link GlobalKTable}
+     * @param <RV>               the value type of the resulting {@link KStream}
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one output for each input {@link KStream} record
+     * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+     */
+    <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
+                                         final KeyValueMapper<K, V, GK> keyValueMapper,
+                                         final ValueJoiner<? super V, ? super GV, ? super RV> valueJoiner);
+
+    /**
+     * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join
+     * The join is a primary key table lookup join with join attribute {@code keyValueMapper.map(stream.keyValue) == table.key}.
+     * "Table lookup join" means, that results are only computed if {@link KStream} records are processed.
+     * This is done by performing a lookup for matching records in the <em>current</em> internal
+     * {@link GlobalKTable} state.
+     * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable} state and
+     * will not produce any result records.
+     * <p>
+     * For each {@link KStream} record that finds an corresponding record in {@link GlobalKTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as the key of this {@link KStream}
+     * If an {@link KStream} input record key or value is {@code null} the record will not be included in the join
+     * operation and thus no output record will be added to the resulting {@link KStream}.
+     * <p>
+
+     * @param globalKTable      the {@link GlobalKTable} to be joined with this stream
+     * @param keyValueMapper    instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
+     *                          to the key of the {@link GlobalKTable}
+     * @param joiner            a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <GK>              the key type of {@link GlobalKTable}
+     * @param <GV>              the value type of the {@link GlobalKTable}
+     * @param <RV>               the value type of the resulting {@link KStream}
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one output for each input {@link KStream} record
+     * @see #leftJoin(KStream, ValueJoiner, JoinWindows)
+     */
+    <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
+                                     final KeyValueMapper<K, V, GK> keyValueMapper,
+                                     final ValueJoiner<? super V, ? super GV, ? super RV> joiner);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index b1c1cfb..c1a74e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -17,13 +17,17 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
+import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.util.Collections;
@@ -273,6 +277,54 @@ public class KStreamBuilder extends TopologyBuilder {
 
 
     /**
+     * Create a new  {@link GlobalKTable} instance for the specified topic.
+     * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime.
+     * The resulting {@link GlobalKTable} will be materialized in a local state store with the given store name.
+     * However, no new changelog topic is created in this case since the underlying topic acts as one.
+     * @param keySerde   key serde used to send key-value pairs,
+     *                   if not specified the default key serde defined in the configuration will be used
+     * @param valSerde   value serde used to send key-value pairs,
+     *                   if not specified the default value serde defined in the configuration will be used
+     * @param topic      the topic name; cannot be null
+     * @param storeName  the state store name used
+     * @return a {@link GlobalKTable} for the specified topics
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) {
+        final String sourceName = newName(KStreamImpl.SOURCE_NAME);
+        final String processorName = newName(KTableImpl.SOURCE_NAME);
+        final KTableSource<K, V> tableSource = new KTableSource<>(storeName);
+
+
+        final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+        final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+        final StateStore store = new RocksDBKeyValueStoreSupplier<>(storeName,
+                                                                    keySerde,
+                                                                    valSerde,
+                                                                    false,
+                                                                    Collections.<String, String>emptyMap(),
+                                                                    true).get();
+
+        addGlobalStore(store, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeName));
+    }
+
+    /**
+     * Create a new  {@link GlobalKTable} instance for the specified topic using the default key and value {@link Serde}s
+     * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime.
+     * The resulting {@link GlobalKTable} will be materialized in a local state store with the given store name.
+     * However, no new changelog topic is created in this case since the underlying topic acts as one.
+     *
+     * @param topic      the topic name; cannot be null
+     * @param storeName  the state store name used if this KTable is materialized, can be null if materialization not expected
+     * @return a {@link GlobalKTable} for the specified topics
+     */
+    public <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName) {
+        return globalTable(null, null, topic, storeName);
+    }
+
+    /**
      * Create a new instance of {@link KStream} by merging the given streams.
      * <p>
      * There are nor ordering guaranteed for records from different streams.
@@ -294,4 +346,6 @@ public class KStreamBuilder extends TopologyBuilder {
     public String newName(String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 5d6fa6f..d95af0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -351,6 +351,7 @@ public interface KTable<K, V> {
      */
     <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner);
 
+
     /**
      * Group the records of this {@link KTable} using the provided {@link KeyValueMapper}.
      * 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
new file mode 100644
index 0000000..0506b1c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.GlobalKTable;
+
+public class GlobalKTableImpl<K, V> implements GlobalKTable<K, V> {
+
+    private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
+
+    public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier) {
+        this.valueGetterSupplier = valueGetterSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+        return valueGetterSupplier;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
new file mode 100644
index 0000000..27c13fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1, V1> {
+
+    private final KTableValueGetterSupplier<K2, V2> valueGetterSupplier;
+    private final ValueJoiner<V1, V2, R> joiner;
+    private final KeyValueMapper<K1, V1, K2> mapper;
+    private final boolean leftJoin;
+
+    KStreamGlobalKTableJoin(final KTableValueGetterSupplier<K2, V2> valueGetterSupplier,
+                            final ValueJoiner<V1, V2, R> joiner,
+                            final KeyValueMapper<K1, V1, K2> mapper,
+                            final boolean leftJoin) {
+        this.valueGetterSupplier = valueGetterSupplier;
+        this.joiner = joiner;
+        this.mapper = mapper;
+        this.leftJoin = leftJoin;
+    }
+
+    @Override
+    public Processor<K1, V1> get() {
+        return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), mapper, joiner, leftJoin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a8e2422..bad4e66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -98,6 +99,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
 
+
     private final boolean repartitionRequired;
 
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes,
@@ -567,6 +569,38 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
+                                              final KeyValueMapper<K, V, K1> keyMapper,
+                                              final ValueJoiner<? super V, ? super V1, ? super R> joiner) {
+        return globalTableJoin(globalTable, keyMapper, joiner, true);
+    }
+
+    @Override
+    public <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> globalTable,
+                                            final KeyValueMapper<K, V, K1> keyMapper,
+                                            final ValueJoiner<? super V, ? super V1, ? super V2> joiner) {
+        return globalTableJoin(globalTable, keyMapper, joiner, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, V1> globalTable,
+                                                        final KeyValueMapper<K, V, K1> keyMapper,
+                                                        final ValueJoiner<? super V, ? super V1, ? super V2> joiner,
+                                                        final boolean leftJoin) {
+        Objects.requireNonNull(globalTable, "globalTable can't be null");
+        Objects.requireNonNull(keyMapper, "keyMapper can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+
+        final KTableValueGetterSupplier valueGetterSupplier = ((GlobalKTableImpl) globalTable).valueGetterSupplier();
+        final String name = topology.newName(LEFTJOIN_NAME);
+        topology.addProcessor(name, new KStreamGlobalKTableJoin(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
+        return new KStreamImpl<>(topology, name, sourceNodes, false);
+    }
+
     private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
                                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                                     final boolean leftJoin) {
@@ -576,8 +610,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
-
-        topology.addProcessor(name, new KStreamKTableJoin<>((KTableImpl<K, ?, V1>) other, joiner, leftJoin), this.name);
+        topology.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
         topology.connectProcessorAndStateStores(name, other.getStoreName());
         topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
@@ -638,6 +671,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
 
+
+
     private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,
                                                                      final Serde<K> keySerde,
                                                                      final Serde<V> valueSerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index 3fa9d8f..7b7f8a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -17,59 +17,32 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
+    private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
+        @Override
+        public K apply(final K key, final V1 value) {
+            return key;
+        }
+    };
     private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
     private final ValueJoiner<? super V1, ? super V2, R> joiner;
     private final boolean leftJoin;
 
-    KStreamKTableJoin(final KTableImpl<K, ?, V2> table, final ValueJoiner<? super V1, ? super V2, R> joiner, final boolean leftJoin) {
-        valueGetterSupplier = table.valueGetterSupplier();
+    KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier, final ValueJoiner<? super V1, ? super V2, R> joiner, final boolean leftJoin) {
+        this.valueGetterSupplier = valueGetterSupplier;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
     }
 
     @Override
     public Processor<K, V1> get() {
-        return new KStreamKTableJoinProcessor(valueGetterSupplier.get(), leftJoin);
+        return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin);
     }
 
-    private class KStreamKTableJoinProcessor extends AbstractProcessor<K, V1> {
-
-        private final KTableValueGetter<K, V2> valueGetter;
-        private final boolean leftJoin;
-
-        KStreamKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter, final boolean leftJoin) {
-            this.valueGetter = valueGetter;
-            this.leftJoin = leftJoin;
-        }
-
-        @Override
-        public void init(final ProcessorContext context) {
-            super.init(context);
-            valueGetter.init(context);
-        }
-
-        @Override
-        public void process(final K key, final V1 value) {
-            // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-            //
-            // we also ignore the record if value is null, because in a key-value data model a null-value indicates
-            // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
-            // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
-            // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-            if (key != null && value != null) {
-                final V2 value2 = valueGetter.get(key);
-                if (leftJoin || value2 != null) {
-                    context().forward(key, joiner.apply(value, value2));
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
new file mode 100644
index 0000000..a9965b9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
+
+    private final KTableValueGetter<K2, V2> valueGetter;
+    private final KeyValueMapper<K1, V1, K2> keyMapper;
+    private final ValueJoiner<? super V1, ? super V2, R> joiner;
+    private final boolean leftJoin;
+
+    KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
+                               final KeyValueMapper<K1, V1, K2> keyMapper,
+                               final ValueJoiner<? super V1, ? super V2, R> joiner,
+                               final boolean leftJoin) {
+        this.valueGetter = valueGetter;
+        this.keyMapper = keyMapper;
+        this.joiner = joiner;
+        this.leftJoin = leftJoin;
+    }
+
+    @Override
+    public void init(final ProcessorContext context) {
+        super.init(context);
+        valueGetter.init(context);
+    }
+
+    @Override
+    public void process(final K1 key, final V1 value) {
+        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+        //
+        // we also ignore the record if value is null, because in a key-value data model a null-value indicates
+        // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
+        // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
+        // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
+        if (key != null && value != null) {
+            final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
+            if (leftJoin || value2 != null) {
+                context().forward(key, joiner.apply(value, value2));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 30e2712..e70054b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -67,7 +67,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
-    public final ProcessorSupplier<?, ?> processorSupplier;
+    private final ProcessorSupplier<?, ?> processorSupplier;
 
     private final String storeName;
 
@@ -263,6 +263,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, false, false);
     }
 
+
+
     @Override
     public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return doJoin(other, joiner, true, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 9f86d7e..21a1176 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -25,6 +26,13 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 
 class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
+    private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
+        @Override
+        public K apply(final K key, final V1 value) {
+            return key;
+        }
+    };
+
     KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         super(table1, table2, joiner);
     }
@@ -46,7 +54,10 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
         }
 
         public KTableValueGetter<K, R> get() {
-            return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+            return new KTableKTableJoinValueGetter<>(valueGetterSupplier1.get(),
+                                                     valueGetterSupplier2.get(),
+                                                     joiner,
+                                                     keyValueMapper);
         }
     }
 
@@ -94,37 +105,4 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
         }
     }
 
-    private class KTableKTableJoinValueGetter implements KTableValueGetter<K, R> {
-
-        private final KTableValueGetter<K, V1> valueGetter1;
-        private final KTableValueGetter<K, V2> valueGetter2;
-
-        public KTableKTableJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
-            this.valueGetter1 = valueGetter1;
-            this.valueGetter2 = valueGetter2;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            valueGetter1.init(context);
-            valueGetter2.init(context);
-        }
-
-        @Override
-        public R get(K key) {
-            R newValue = null;
-            V1 value1 = valueGetter1.get(key);
-
-            if (value1 != null) {
-                V2 value2 = valueGetter2.get(key);
-
-                if (value2 != null)
-                    newValue = joiner.apply(value1, value2);
-            }
-
-            return newValue;
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
new file mode 100644
index 0000000..30bc4ba
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableJoinValueGetter<K1, V1, K2, V2, R> implements KTableValueGetter<K1, R> {
+
+    private final KTableValueGetter<K1, V1> valueGetter1;
+    private final KTableValueGetter<K2, V2> valueGetter2;
+    private final ValueJoiner<? super V1, ? super V2, ? extends R>  joiner;
+    private final KeyValueMapper<K1, V1, K2> keyValueMapper;
+
+    public KTableKTableJoinValueGetter(final KTableValueGetter<K1, V1> valueGetter1,
+                                       final KTableValueGetter<K2, V2> valueGetter2,
+                                       final ValueJoiner<? super V1, ? super V2, ? extends R>  joiner,
+                                       final KeyValueMapper<K1, V1, K2> keyValueMapper) {
+        this.valueGetter1 = valueGetter1;
+        this.valueGetter2 = valueGetter2;
+        this.joiner = joiner;
+        this.keyValueMapper = keyValueMapper;
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        valueGetter1.init(context);
+        valueGetter2.init(context);
+    }
+
+    @Override
+    public R get(K1 key) {
+        R newValue = null;
+        V1 value1 = valueGetter1.get(key);
+
+        if (value1 != null) {
+            V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
+
+            if (value2 != null) {
+                newValue = joiner.apply(value1, value2);
+            }
+        }
+
+        return newValue;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinValueGetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinValueGetter.java
new file mode 100644
index 0000000..57b282c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinValueGetter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableLeftJoinValueGetter<K1, K2, V1, V2, R> implements KTableValueGetter<K1, R> {
+
+    private final KTableValueGetter<K1, V1> valueGetter1;
+    private final KTableValueGetter<K2, V2> valueGetter2;
+    private final ValueJoiner<V1, V2, R> joiner;
+    private final KeyValueMapper<K1, V1, K2> keyMapper;
+
+    KTableKTableLeftJoinValueGetter(final KTableValueGetter<K1, V1> valueGetter1,
+                                    final KTableValueGetter<K2, V2> valueGetter2,
+                                    final ValueJoiner<V1, V2, R> joiner,
+                                    final KeyValueMapper<K1, V1, K2> keyMapper) {
+        this.valueGetter1 = valueGetter1;
+        this.valueGetter2 = valueGetter2;
+        this.joiner = joiner;
+        this.keyMapper = keyMapper;
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        valueGetter1.init(context);
+        valueGetter2.init(context);
+    }
+
+    @Override
+    public R get(K1 key) {
+        V1 value1 = valueGetter1.get(key);
+
+        if (value1 != null) {
+            V2 value2 = valueGetter2.get(keyMapper.apply(key, value1));
+            return joiner.apply(value1, value2);
+        } else {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index e59a3eb..a2034a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
 
@@ -39,11 +39,11 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
 
     private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
 
-        KeyValueStore<K, V> store = null;
+        ReadOnlyKeyValueStore<K, V> store = null;
 
         @SuppressWarnings("unchecked")
         public void init(ProcessorContext context) {
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+            store = (ReadOnlyKeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
         public V get(K key) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 97cb4be..e8f0994 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -59,12 +59,18 @@ import java.util.regex.Pattern;
  * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
  */
 public class TopologyBuilder {
+
+    private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
+
     // node factories in a topological order
     private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
 
     // state factories
     private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
 
+    // global state factories
+    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();
+
     // all topics subscribed from source processors (without application-id prefix for internal topics)
     private final Set<String> sourceTopicNames = new HashSet<>();
 
@@ -95,6 +101,9 @@ public class TopologyBuilder {
     // this is used in the extended KStreamBuilder.
     private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
 
+    // all global topics
+    private final Set<String> globalTopics = new HashSet<>();
+
     private final Set<String> earliestResetTopics = new HashSet<>();
 
     private final Set<String> latestResetTopics = new HashSet<>();
@@ -115,8 +124,6 @@ public class TopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
-
     private static class StateStoreFactory {
         public final Set<String> users;
 
@@ -432,18 +439,8 @@ public class TopologyBuilder {
 
         for (String topic : topics) {
             Objects.requireNonNull(topic, "topic names cannot be null");
-            if (sourceTopicNames.contains(topic))
-                throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
-
-            for (Pattern pattern : nodeToSourcePatterns.values()) {
-                if (pattern.matcher(topic).matches()) {
-                    throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
-                }
-            }
-
-
+            validateTopicNotAlreadyRegistered(topic);
             maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
-
             sourceTopicNames.add(topic);
         }
 
@@ -455,6 +452,82 @@ public class TopologyBuilder {
     }
 
     /**
+     * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
+     * from all partitions of the provided input topic. There will be exactly one instance of this
+     * {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving
+     * from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will
+     * receive all records forwarded from the {@link SourceNode}. This
+     * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param store                 the instance of {@link StateStore}
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return
+     */
+    public synchronized TopologyBuilder addGlobalStore(final StateStore store,
+                                                       final String sourceName,
+                                                       final Deserializer keyDeserializer,
+                                                       final Deserializer valueDeserializer,
+                                                       final String topic,
+                                                       final String processorName,
+                                                       final ProcessorSupplier stateUpdateSupplier) {
+        Objects.requireNonNull(store, "store must not be null");
+        Objects.requireNonNull(sourceName, "sourceName must not be null");
+        Objects.requireNonNull(topic, "topic must not be null");
+        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+        Objects.requireNonNull(processorName, "processorName must not be null");
+        if (nodeFactories.containsKey(sourceName)) {
+            throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
+        }
+        if (nodeFactories.containsKey(processorName)) {
+            throw new TopologyBuilderException("Processor " + processorName + " is already added.");
+        }
+        if (stateFactories.containsKey(store.name()) || globalStateStores.containsKey(store.name())) {
+            throw new TopologyBuilderException("StateStore " + store.name() + " is already added.");
+        }
+
+        validateTopicNotAlreadyRegistered(topic);
+
+        globalTopics.add(topic);
+        final String[] topics = {topic};
+        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, keyDeserializer, valueDeserializer));
+        nodeToSourceTopics.put(sourceName, topics.clone());
+        nodeGrouper.add(sourceName);
+
+        final String[] parents = {sourceName};
+        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, parents, stateUpdateSupplier);
+        nodeFactory.addStateStore(store.name());
+        nodeFactories.put(processorName, nodeFactory);
+        nodeGrouper.add(processorName);
+        nodeGrouper.unite(processorName, parents);
+
+        globalStateStores.put(store.name(), store);
+        connectSourceStoreAndTopic(store.name(), topic);
+        return this;
+
+    }
+
+    private void validateTopicNotAlreadyRegistered(final String topic) {
+        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
+            throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
+        }
+
+        for (Pattern pattern : nodeToSourcePatterns.values()) {
+            if (pattern.matcher(topic).matches()) {
+                throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
+            }
+        }
+    }
+
+    /**
      * Add a new source that consumes from topics matching the given pattern
      * and forwards the records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers. The provided
@@ -669,7 +742,6 @@ public class TopologyBuilder {
         nodeGrouper.unite(name, parentNames);
         return this;
     }
-
     /**
      * Adds a state store
      *
@@ -694,7 +766,6 @@ public class TopologyBuilder {
         return this;
     }
 
-
     /**
      * Connects the processor and the state stores
      *
@@ -905,12 +976,49 @@ public class TopologyBuilder {
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
         } else {
-            // when nodeGroup is null, we build the full topology. this is used in some tests.
-            nodeGroup = null;
+            // when topicGroupId is null, we build the full topology minus the global groups
+            final Set<String> globalNodeGroups = globalNodeGroups();
+            final Collection<Set<String>> values = nodeGroups().values();
+            nodeGroup = new HashSet<>();
+            for (Set<String> value : values) {
+                nodeGroup.addAll(value);
+            }
+            nodeGroup.removeAll(globalNodeGroups);
+
+
         }
         return build(nodeGroup);
     }
 
+    /**
+     * Builds the topology for any global state stores
+     * @return ProcessorTopology
+     */
+    public synchronized ProcessorTopology buildGlobalStateTopology() {
+        final Set<String> globalGroups = globalNodeGroups();
+        if (globalGroups.isEmpty()) {
+            return null;
+        }
+        return build(globalGroups);
+    }
+
+    private Set<String> globalNodeGroups() {
+        final Set<String> globalGroups = new HashSet<>();
+        for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
+            final Set<String> nodes = nodeGroup.getValue();
+            for (String node : nodes) {
+                final NodeFactory nodeFactory = nodeFactories.get(node);
+                if (nodeFactory instanceof SourceNodeFactory) {
+                    final String[] topics = ((SourceNodeFactory) nodeFactory).getTopics();
+                    if (topics != null && topics.length == 1 && globalTopics.contains(topics[0])) {
+                        globalGroups.addAll(nodes);
+                    }
+                }
+            }
+        }
+        return globalGroups;
+    }
+
     @SuppressWarnings("unchecked")
     private ProcessorTopology build(Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
@@ -933,8 +1041,7 @@ public class TopologyBuilder {
                     }
                     for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
                         if (!stateStoreMap.containsKey(stateStoreName)) {
-                            final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
-                            final StateStore stateStore = supplier.get();
+                            final StateStore stateStore = getStateStore(stateStoreName);
                             stateStoreMap.put(stateStoreName, stateStore);
                             storeToProcessorNodeMap.put(stateStore, node);
                         }
@@ -970,7 +1077,29 @@ public class TopologyBuilder {
             }
         }
 
-        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), sourceStoreToSourceTopic, storeToProcessorNodeMap);
+        return new ProcessorTopology(processorNodes,
+                                     topicSourceMap,
+                                     topicSinkMap,
+                                     new ArrayList<>(stateStoreMap.values()),
+                                     sourceStoreToSourceTopic,
+                                     storeToProcessorNodeMap,
+                                     new ArrayList<>(globalStateStores.values()));
+    }
+
+    /**
+     * Get any global {@link StateStore}s that are part of the
+     * topology
+     * @return map containing all global {@link StateStore}s
+     */
+    public Map<String, StateStore> globalStateStores() {
+        return Collections.unmodifiableMap(globalStateStores);
+    }
+
+    private StateStore getStateStore(final String stateStoreName) {
+        if (stateFactories.containsKey(stateStoreName)) {
+            return stateFactories.get(stateStoreName).supplier.get();
+        }
+        return globalStateStores.get(stateStoreName);
     }
 
     /**
@@ -996,6 +1125,10 @@ public class TopologyBuilder {
                 if (topics != null) {
                     // if some of the topics are internal, add them to the internal topics
                     for (String topic : topics) {
+                        // skip global topic as they don't need partition assignment
+                        if (globalTopics.contains(topic)) {
+                            continue;
+                        }
                         if (this.internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the application id
                             String internalTopic = decorateTopic(topic);
@@ -1030,11 +1163,13 @@ public class TopologyBuilder {
                     }
                 }
             }
-            topicGroups.put(entry.getKey(), new TopicsInfo(
-                    Collections.unmodifiableSet(sinkTopics),
-                    Collections.unmodifiableSet(sourceTopics),
-                    Collections.unmodifiableMap(internalSourceTopics),
-                    Collections.unmodifiableMap(stateChangelogTopics)));
+            if (!sourceTopics.isEmpty()) {
+                topicGroups.put(entry.getKey(), new TopicsInfo(
+                        Collections.unmodifiableSet(sinkTopics),
+                        Collections.unmodifiableSet(sourceTopics),
+                        Collections.unmodifiableMap(internalSourceTopics),
+                        Collections.unmodifiableMap(stateChangelogTopics)));
+            }
         }
 
         return Collections.unmodifiableMap(topicGroups);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
new file mode 100644
index 0000000..df9cacb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.util.Map;
+
+
+public abstract class AbstractProcessorContext implements InternalProcessorContext {
+
+    static final String NONEXIST_TOPIC = "__null_topic__";
+    private final TaskId taskId;
+    private final String applicationId;
+    private final StreamsConfig config;
+    private final StreamsMetrics metrics;
+    private final Serde keySerde;
+    private final ThreadCache cache;
+    private final Serde valueSerde;
+    private boolean initialized;
+    private RecordContext recordContext;
+    private ProcessorNode currentNode;
+    final StateManager stateManager;
+
+    public AbstractProcessorContext(final TaskId taskId,
+                             final String applicationId,
+                             final StreamsConfig config,
+                             final StreamsMetrics metrics,
+                             final StateManager stateManager,
+                             final ThreadCache cache) {
+
+        this.taskId = taskId;
+        this.applicationId = applicationId;
+        this.config = config;
+        this.metrics = metrics;
+        this.stateManager = stateManager;
+        valueSerde = config.valueSerde();
+        keySerde = config.keySerde();
+        this.cache = cache;
+    }
+
+    @Override
+    public String applicationId() {
+        return applicationId;
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return keySerde;
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return valueSerde;
+    }
+
+    @Override
+    public File stateDir() {
+        return stateManager.baseDir();
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    @Override
+    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
+        if (initialized) {
+            throw new IllegalStateException("Can only create state stores during initialization.");
+        }
+
+        stateManager.register(store, loggingEnabled, stateRestoreCallback);
+    }
+
+    /**
+     * @throws IllegalStateException if the task's record is null
+     */
+    @Override
+    public String topic() {
+        if (recordContext == null) {
+            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
+        }
+
+        String topic = recordContext.topic();
+
+        if (topic.equals(NONEXIST_TOPIC)) {
+            return null;
+        }
+
+        return topic;
+    }
+
+    /**
+     * @throws IllegalStateException if partition is null
+     */
+    @Override
+    public int partition() {
+        if (recordContext == null) {
+            throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
+        }
+
+        return recordContext.partition();
+    }
+
+    /**
+     * @throws IllegalStateException if offset is null
+     */
+    @Override
+    public long offset() {
+        if (recordContext == null) {
+            throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
+        }
+
+        return recordContext.offset();
+    }
+
+    /**
+     * @throws IllegalStateException if timestamp is null
+     */
+    @Override
+    public long timestamp() {
+        if (recordContext == null) {
+            throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
+        }
+
+        return recordContext.timestamp();
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        return config.originals();
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public void setRecordContext(final RecordContext recordContext) {
+        this.recordContext = recordContext;
+    }
+
+    @Override
+    public RecordContext recordContext() {
+        return this.recordContext;
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode currentNode) {
+        this.currentNode = currentNode;
+    }
+
+    @Override
+    public ProcessorNode currentNode() {
+        return currentNode;
+    }
+
+    @Override
+    public ThreadCache getCache() {
+        return cache;
+    }
+
+    @Override
+    public void initialized() {
+        initialized = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b438b75..bed3311 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -85,6 +85,7 @@ public abstract class AbstractTask {
             log.trace("task [{}] Initializing store {}", id(), store.name());
             store.init(this.processorContext, store);
         }
+
     }
 
     public final TaskId id() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
new file mode 100644
index 0000000..bc9d89d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.util.List;
+
+public class GlobalProcessorContextImpl extends AbstractProcessorContext {
+
+
+    public GlobalProcessorContextImpl(final StreamsConfig config,
+                                      final StateManager stateMgr,
+                                      final StreamsMetrics metrics,
+                                      final ThreadCache cache) {
+        super(new TaskId(-1, -1), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), config, metrics, stateMgr, cache);
+    }
+
+    /**
+     * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
+     */
+    @Override
+    public StateStore getStateStore(final String name) {
+        return stateManager.getGlobalStore(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(K key, V value) {
+        final ProcessorNode previousNode = currentNode();
+        try {
+            for (ProcessorNode child : (List<ProcessorNode>) currentNode().children()) {
+                setCurrentNode(child);
+                child.process(key, value);
+            }
+        } finally {
+            setCurrentNode(previousNode);
+        }
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value, int childIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value, String childName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void commit() {
+        //no-op
+    }
+
+    @Override
+    public void schedule(long interval) {
+        throw new UnsupportedOperationException();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
new file mode 100644
index 0000000..226fdda
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Interface for maintaining global state stores. see {@link GlobalStateUpdateTask}
+ */
+interface GlobalStateMaintainer {
+
+    Map<TopicPartition, Long> initialize();
+
+    void flushState();
+
+    void close() throws IOException;
+
+    void update(ConsumerRecord<byte[], byte[]> record);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
new file mode 100644
index 0000000..7670ff0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Set;
+
+public interface GlobalStateManager extends StateManager {
+    Set<String> initialize(InternalProcessorContext processorContext);
+}


Mime
View raw message