kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3016: phase-2. stream join implementations
Date Wed, 06 Jan 2016 22:34:46 GMT
KAFKA-3016: phase-2. stream join implementations

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #737 from ymatsuda/windowed_join2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5aad4999
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5aad4999
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5aad4999

Branch: refs/heads/trunk
Commit: 5aad4999d1a1d35d61365ff57a9b79a6af3e70d2
Parents: a788c65
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Jan 6 14:34:40 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jan 6 14:34:40 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindowSpec.java   |  91 ++++
 .../apache/kafka/streams/kstream/KStream.java   |  97 +++-
 .../streams/kstream/SlidingWindowSupplier.java  | 266 ----------
 .../apache/kafka/streams/kstream/Window.java    |  36 --
 .../kafka/streams/kstream/WindowSupplier.java   |  25 -
 .../streams/kstream/internals/KStreamImpl.java  | 147 +++++-
 .../streams/kstream/internals/KStreamJoin.java  |  84 ---
 .../kstream/internals/KStreamJoinWindow.java    |  58 +++
 .../kstream/internals/KStreamKStreamJoin.java   |  73 +++
 .../kstream/internals/KStreamWindow.java        |  68 ---
 .../kstream/internals/KStreamWindowedImpl.java  |  67 ---
 .../state/RocksDBWindowStoreSupplier.java       |   2 +-
 .../kstream/internals/KStreamImplTest.java      |  68 +--
 .../kstream/internals/KStreamJoinTest.java      | 195 -------
 .../internals/KStreamKStreamJoinTest.java       | 505 +++++++++++++++++++
 .../internals/KStreamKStreamLeftJoinTest.java   | 289 +++++++++++
 .../kstream/internals/KStreamWindowedTest.java  |  91 ----
 .../apache/kafka/test/UnlimitedWindowDef.java   | 104 ----
 18 files changed, 1266 insertions(+), 1000 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
new file mode 100644
index 0000000..8f0f839
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+/**
+ * This class is used to specify the behaviour of windowed joins.
+ */
+public class JoinWindowSpec {
+
+    public final String name;
+    public final long before;
+    public final long after;
+    public final long retention;
+    public final int segments;
+
+    private JoinWindowSpec(String name, long before, long after, long retention, int segments) {
+        this.name = name;
+        this.after = after;
+        this.before = before;
+        this.retention = retention;
+        this.segments = segments;
+    }
+
+    public static JoinWindowSpec of(String name) {
+        return new JoinWindowSpec(name, 0L, 0L, 0L, 3);
+    }
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * timeDifference.
+     *
+     * @param timeDifference
+     * @return
+     */
+    public JoinWindowSpec within(long timeDifference) {
+        return new JoinWindowSpec(name, timeDifference, timeDifference, retention, segments);
+    }
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * timeDifference, and if the timestamp of a record from the secondary stream is
+     * is earlier than or equal to the timestamp of a record from the first stream.
+     *
+     * @param timeDifference
+     * @return
+     */
+    public JoinWindowSpec before(long timeDifference) {
+        return new JoinWindowSpec(name, timeDifference, 0L, retention, segments);
+    }
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * timeDifference, and if the timestamp of a record from the secondary stream is
+     * is later than or equal to the timestamp of a record from the first stream.
+     *
+     * @param timeDifference
+     * @return
+     */
+    public JoinWindowSpec after(long timeDifference) {
+        return new JoinWindowSpec(name, 0L, timeDifference, retention, segments);
+    }
+
+    /**
+     * Specifies the retention period of windows
+     * @param retentionPeriod
+     * @return
+     */
+    public JoinWindowSpec retentionPeriod(long retentionPeriod) {
+        return new JoinWindowSpec(name, before, after, retentionPeriod, segments);
+    }
+
+    public JoinWindowSpec segments(int segments) {
+        return new JoinWindowSpec(name, before, after, retention, segments);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index d3931ef..29115c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -84,14 +84,6 @@ public interface KStream<K, V> {
     <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
 
     /**
-     * Creates a new windowed stream using a specified window instance.
-     *
-     * @param windowDef the instance of Window
-     * @return the windowed stream
-     */
-    KStreamWindowed<K, V> with(WindowSupplier<K, V> windowDef);
-
-    /**
      * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
      * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
      * a corresponding stream for the first predicate is evaluated true.
@@ -173,6 +165,95 @@ public interface KStream<K, V> {
     void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
 
     /**
+     * Combines values of this stream with another KStream using Windowed Inner Join.
+     *
+     * @param otherStream the instance of KStream joined with this stream
+     * @param joiner ValueJoiner
+     * @param joinWindowSpec the specification of the join window
+     * @param keySerializer key serializer,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param thisValueSerializer value serializer for this stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param otherValueSerializer value serializer for other stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param keyDeserializer key deserializer,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param thisValueDeserializer value deserializer for this stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param otherValueDeserializer value deserializer for other stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     */
+    <V1, V2> KStream<K, V2> join(
+            KStream<K, V1> otherStream,
+            ValueJoiner<V, V1, V2> joiner,
+            JoinWindowSpec joinWindowSpec,
+            Serializer<K> keySerializer,
+            Serializer<V> thisValueSerializer,
+            Serializer<V1> otherValueSerializer,
+            Deserializer<K> keyDeserializer,
+            Deserializer<V> thisValueDeserializer,
+            Deserializer<V1> otherValueDeserializer);
+
+    /**
+     * Combines values of this stream with another KStream using Windowed Outer Join.
+     *
+     * @param otherStream the instance of KStream joined with this stream
+     * @param joiner ValueJoiner
+     * @param joinWindowSpec the specification of the join window
+     * @param keySerializer key serializer,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param thisValueSerializer value serializer for this stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param otherValueSerializer value serializer for other stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param keyDeserializer key deserializer,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param thisValueDeserializer value deserializer for this stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param otherValueDeserializer value deserializer for other stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     */
+    <V1, V2> KStream<K, V2> outerJoin(
+            KStream<K, V1> otherStream,
+            ValueJoiner<V, V1, V2> joiner,
+            JoinWindowSpec joinWindowSpec,
+            Serializer<K> keySerializer,
+            Serializer<V> thisValueSerializer,
+            Serializer<V1> otherValueSerializer,
+            Deserializer<K> keyDeserializer,
+            Deserializer<V> thisValueDeserializer,
+            Deserializer<V1> otherValueDeserializer);
+
+    /**
+     * Combines values of this stream with another KStream using Windowed Left Join.
+     *
+     * @param otherStream the instance of KStream joined with this stream
+     * @param joiner ValueJoiner
+     * @param keySerializer key serializer,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param otherValueSerializer value serializer for other stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param keyDeserializer key deserializer,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param otherValueDeserializer value deserializer for other stream,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     */
+    <V1, V2> KStream<K, V2> leftJoin(
+            KStream<K, V1> otherStream,
+            ValueJoiner<V, V1, V2> joiner,
+            JoinWindowSpec joinWindowSpec,
+            Serializer<K> keySerializer,
+            Serializer<V1> otherValueSerializer,
+            Deserializer<K> keyDeserializer,
+            Deserializer<V1> otherValueDeserializer);
+
+    /**
      * Combines values of this stream with KTable using Left Join.
      *
      * @param ktable the instance of KTable joined with this stream

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
deleted file mode 100644
index 80e548f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.internals.FilteredIterator;
-import org.apache.kafka.streams.kstream.internals.WindowSupport;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
-    private final String name;
-    private final long duration;
-    private final int maxCount;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-
-    public SlidingWindowSupplier(
-            String name,
-            long duration,
-            int maxCount,
-            Serializer<K> keySerializer,
-            Serializer<V> valueSerializer,
-            Deserializer<K> keyDeseriaizer,
-            Deserializer<V> valueDeserializer) {
-        this.name = name;
-        this.duration = duration;
-        this.maxCount = maxCount;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
-        this.keyDeserializer = keyDeseriaizer;
-        this.valueDeserializer = valueDeserializer;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public Window<K, V> get() {
-        return new SlidingWindow();
-    }
-
-    public class SlidingWindow extends WindowSupport implements Window<K, V> {
-        private final Object lock = new Object();
-        private ProcessorContext context;
-        private int partition;
-        private int slotNum; // used as a key for Kafka log compaction
-        private LinkedList<K> list = new LinkedList<K>();
-        private HashMap<K, ValueList<V>> map = new HashMap<>();
-
-        @Override
-        public void init(ProcessorContext context) {
-            this.context = context;
-            this.partition = context.id().partition;
-            SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
-            context.register(this, true, restoreFunc);
-
-            for (ValueList<V> valueList : map.values()) {
-                valueList.clearDirtyValues();
-            }
-            this.slotNum = restoreFunc.slotNum;
-        }
-
-        @Override
-        public Iterator<V> findAfter(K key, final long timestamp) {
-            return find(key, timestamp, timestamp + duration);
-        }
-
-        @Override
-        public Iterator<V> findBefore(K key, final long timestamp) {
-            return find(key, timestamp - duration, timestamp);
-        }
-
-        @Override
-        public Iterator<V> find(K key, final long timestamp) {
-            return find(key, timestamp - duration, timestamp + duration);
-        }
-
-        /*
-         * finds items in the window between startTime and endTime (both inclusive)
-         */
-        private Iterator<V> find(K key, final long startTime, final long endTime) {
-            final ValueList<V> values = map.get(key);
-
-            if (values == null) {
-                return Collections.emptyIterator();
-            } else {
-                return new FilteredIterator<V, Value<V>>(values.iterator()) {
-                    @Override
-                    protected V filter(Value<V> item) {
-                        if (startTime <= item.timestamp && item.timestamp <= endTime)
-                            return item.value;
-                        else
-                            return null;
-                    }
-                };
-            }
-        }
-
-        @Override
-        public void put(K key, V value, long timestamp) {
-            synchronized (lock) {
-                slotNum++;
-
-                list.offerLast(key);
-
-                ValueList<V> values = map.get(key);
-                if (values == null) {
-                    values = new ValueList<>();
-                    map.put(key, values);
-                }
-
-                values.add(slotNum, value, timestamp);
-            }
-            evictExcess();
-            evictExpired(timestamp - duration);
-        }
-
-        private void evictExcess() {
-            while (list.size() > maxCount) {
-                K oldestKey = list.pollFirst();
-
-                ValueList<V> values = map.get(oldestKey);
-                values.removeFirst();
-
-                if (values.isEmpty()) map.remove(oldestKey);
-            }
-        }
-
-        private void evictExpired(long cutoffTime) {
-            while (true) {
-                K oldestKey = list.peekFirst();
-
-                ValueList<V> values = map.get(oldestKey);
-                Stamped<V> oldestValue = values.first();
-
-                if (oldestValue.timestamp < cutoffTime) {
-                    list.pollFirst();
-                    values.removeFirst();
-
-                    if (values.isEmpty()) map.remove(oldestKey);
-                } else {
-                    break;
-                }
-            }
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public void flush() {
-            IntegerSerializer intSerializer = new IntegerSerializer();
-            ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
-
-            RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-
-            for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
-                ValueList<V> values = entry.getValue();
-                if (values.hasDirtyValues()) {
-                    K key = entry.getKey();
-
-                    byte[] keyBytes = keySerializer.serialize(name, key);
-
-                    Iterator<Value<V>> iterator = values.dirtyValueIterator();
-                    while (iterator.hasNext()) {
-                        Value<V> dirtyValue = iterator.next();
-                        byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
-                        byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
-
-                        byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
-
-                        int offset = 0;
-                        offset += putLong(combined, offset, dirtyValue.timestamp);
-                        offset += puts(combined, offset, keyBytes);
-                        offset += puts(combined, offset, valBytes);
-
-                        if (offset != combined.length)
-                            throw new IllegalStateException("serialized length does not match");
-
-                        collector.send(new ProducerRecord<>(name, partition, slot, combined), byteArraySerializer, byteArraySerializer);
-                    }
-                    values.clearDirtyValues();
-                }
-            }
-        }
-
-        @Override
-        public void close() {
-            // TODO
-        }
-
-        @Override
-        public boolean persistent() {
-            // TODO: should not be persistent, right?
-            return false;
-        }
-
-        private class SlidingWindowRegistryCallback implements StateRestoreCallback {
-
-            final IntegerDeserializer intDeserializer;
-            int slotNum = 0;
-
-            SlidingWindowRegistryCallback() {
-                intDeserializer = new IntegerDeserializer();
-            }
-
-            @Override
-            public void restore(byte[] slot, byte[] bytes) {
-
-                slotNum = intDeserializer.deserialize("", slot);
-
-                int offset = 0;
-                // timestamp
-                long timestamp = getLong(bytes, offset);
-                offset += 8;
-                // key
-                int length = getInt(bytes, offset);
-                offset += 4;
-                K key = deserialize(bytes, offset, length, name, keyDeserializer);
-                offset += length;
-                // value
-                length = getInt(bytes, offset);
-                offset += 4;
-                V value = deserialize(bytes, offset, length, name, valueDeserializer);
-
-                put(key, value, timestamp);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
deleted file mode 100644
index a1456f6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.Iterator;
-
-public interface Window<K, V> extends StateStore {
-
-    void init(ProcessorContext context);
-
-    Iterator<V> find(K key, long timestamp);
-
-    Iterator<V> findAfter(K key, long timestamp);
-
-    Iterator<V> findBefore(K key, long timestamp);
-
-    void put(K key, V value, long timestamp);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
deleted file mode 100644
index 46a2b9e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface WindowSupplier<K, V> {
-
-    String name();
-
-    Window<K, V> get();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 67a2d27..f47fe0f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.JoinWindowSpec;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValue;
@@ -26,12 +27,12 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.Serdes;
 
 import java.lang.reflect.Array;
 import java.util.HashSet;
@@ -67,6 +68,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
 
+    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
+
+    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
+
     public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
 
     public static final String MERGE_NAME = "KSTREAM-MERGE-";
@@ -132,15 +137,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) {
-        String name = topology.newName(WINDOWED_NAME);
-
-        topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name);
-
-        return new KStreamWindowedImpl<>(topology, name, sourceNodes, windowSupplier);
-    }
-
-    @Override
     @SuppressWarnings("unchecked")
     public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
         String branchName = topology.newName(BRANCH_NAME);
@@ -239,6 +235,135 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         topology.connectProcessorAndStateStores(name, stateStoreNames);
     }
 
+    @Override
+    public <V1, R> KStream<K, R> join(
+            KStream<K, V1> other,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindowSpec joinWindowSpec,
+            Serializer<K> keySerialzier,
+            Serializer<V> thisValueSerialzier,
+            Serializer<V1> otherValueSerialzier,
+            Deserializer<K> keyDeserialier,
+            Deserializer<V> thisValueDeserialzier,
+            Deserializer<V1> otherValueDeserialzier) {
+
+        return join(other, joiner, joinWindowSpec,
+                keySerialzier, thisValueSerialzier, otherValueSerialzier,
+                keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
+    }
+
+    @Override
+    public <V1, R> KStream<K, R> outerJoin(
+            KStream<K, V1> other,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindowSpec joinWindowSpec,
+            Serializer<K> keySerialzier,
+            Serializer<V> thisValueSerialzier,
+            Serializer<V1> otherValueSerialzier,
+            Deserializer<K> keyDeserialier,
+            Deserializer<V> thisValueDeserialzier,
+            Deserializer<V1> otherValueDeserialzier) {
+
+        return join(other, joiner, joinWindowSpec,
+                keySerialzier, thisValueSerialzier, otherValueSerialzier,
+                keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V1, R> KStream<K, R> join(
+            KStream<K, V1> other,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindowSpec joinWindowSpec,
+            Serializer<K> keySerialzier,
+            Serializer<V> thisValueSerialzier,
+            Serializer<V1> otherValueSerialzier,
+            Deserializer<K> keyDeserialier,
+            Deserializer<V> thisValueDeserialzier,
+            Deserializer<V1> otherValueDeserialzier,
+            boolean outer) {
+
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+        RocksDBWindowStoreSupplier<K, V> thisWindow =
+                new RocksDBWindowStoreSupplier<>(
+                        joinWindowSpec.name + "-1",
+                        joinWindowSpec.before,
+                        joinWindowSpec.after,
+                        joinWindowSpec.retention,
+                        joinWindowSpec.segments,
+                        new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
+                        null);
+
+        RocksDBWindowStoreSupplier<K, V1> otherWindow =
+                new RocksDBWindowStoreSupplier<>(
+                        joinWindowSpec.name + "-2",
+                        joinWindowSpec.after,
+                        joinWindowSpec.before,
+                        joinWindowSpec.retention,
+                        joinWindowSpec.segments,
+                        new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+                        null);
+
+        KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
+        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
+
+        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, outer);
+        KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), reverseJoiner(joiner), outer);
+        KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
+
+        String thisWindowStreamName = topology.newName(WINDOWED_NAME);
+        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+        String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+        String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+        String joinMergeName = topology.newName(MERGE_NAME);
+
+        topology.addProcessor(thisWindowStreamName, thisWindowedStream, this.name);
+        topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
+        topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
+        topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
+        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+        topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
+        topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
+
+        return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <V1, R> KStream<K, R> leftJoin(
+            KStream<K, V1> other,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindowSpec joinWindowSpec,
+            Serializer<K> keySerialzier,
+            Serializer<V1> otherValueSerialzier,
+            Deserializer<K> keyDeserialier,
+            Deserializer<V1> otherValueDeserialzier) {
+
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+        RocksDBWindowStoreSupplier<K, V1> otherWindow =
+                new RocksDBWindowStoreSupplier<>(
+                        joinWindowSpec.name,
+                        joinWindowSpec.after,
+                        joinWindowSpec.before,
+                        joinWindowSpec.retention,
+                        joinWindowSpec.segments,
+                        new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+                        null);
+
+        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
+        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, true);
+
+        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+        String joinThisName = topology.newName(LEFTJOIN_NAME);
+
+        topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
+        topology.addProcessor(joinThisName, joinThis, this.name);
+        topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
+
+        return new KStreamImpl<>(topology, joinThisName, allSourceNodes);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
deleted file mode 100644
index eefb8c9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Iterator;
-
-class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
-
-    private static abstract class Finder<K, T> {
-        abstract Iterator<T> find(K key, long timestamp);
-    }
-
-    private final String windowName;
-    private final ValueJoiner<V1, V2, V> joiner;
-
-    KStreamJoin(String windowName, ValueJoiner<V1, V2, V> joiner) {
-        this.windowName = windowName;
-        this.joiner = joiner;
-    }
-
-    @Override
-    public Processor<K, V1> get() {
-        return new KStreamJoinProcessor(windowName);
-    }
-
-    private class KStreamJoinProcessor extends AbstractProcessor<K, V1> {
-
-        private final String windowName;
-        protected Finder<K, V2> finder;
-
-        public KStreamJoinProcessor(String windowName) {
-            this.windowName = windowName;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-
-            final Window<K, V2> window = (Window<K, V2>) context.getStateStore(windowName);
-
-            this.finder = new Finder<K, V2>() {
-                @Override
-                Iterator<V2> find(K key, long timestamp) {
-                    return window.find(key, timestamp);
-                }
-            };
-        }
-
-        @Override
-        public void process(K key, V1 value) {
-            long timestamp = context().timestamp();
-            Iterator<V2> iter = finder.find(key, timestamp);
-            if (iter != null) {
-                while (iter.hasNext()) {
-                    context().forward(key, joiner.apply(value, iter.next()));
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
new file mode 100644
index 0000000..b122aa1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
+
+    private final String windowName;
+
+    KStreamJoinWindow(String windowName) {
+        this.windowName = windowName;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamJoinWindowProcessor();
+    }
+
+    private class KStreamJoinWindowProcessor extends AbstractProcessor<K, V> {
+
+        private WindowStore<K, V> window;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            window = (WindowStore<K, V>) context.getStateStore(windowName);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            context().forward(key, value);
+            window.put(key, value);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
new file mode 100644
index 0000000..8a9bf6c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Iterator;
+
+class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
+
+    private final String otherWindowName;
+    private final ValueJoiner<V1, V2, R> joiner;
+    private final boolean outer;
+
+    KStreamKStreamJoin(String otherWindowName, ValueJoiner<V1, V2, R> joiner, boolean outer) {
+        this.otherWindowName = otherWindowName;
+        this.joiner = joiner;
+        this.outer = outer;
+    }
+
+    @Override
+    public Processor<K, V1> get() {
+        return new KStreamKStreamJoinProcessor();
+    }
+
+    private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private WindowStore<K, V2> otherWindow;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+        }
+
+        @Override
+        public void process(K key, V1 value) {
+            boolean needOuterJoin = KStreamKStreamJoin.this.outer;
+
+            Iterator<V2> iter = otherWindow.fetch(key, context().timestamp());
+            while (iter.hasNext()) {
+                needOuterJoin = false;
+                context().forward(key, joiner.apply(value, iter.next()));
+            }
+
+            if (needOuterJoin)
+                context().forward(key, joiner.apply(value, null));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
deleted file mode 100644
index 2923936..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamWindow<K, V> implements ProcessorSupplier<K, V> {
-
-    private final WindowSupplier<K, V> windowSupplier;
-
-    KStreamWindow(WindowSupplier<K, V> windowSupplier) {
-        this.windowSupplier = windowSupplier;
-    }
-
-    public WindowSupplier<K, V> window() {
-        return windowSupplier;
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamWindowProcessor();
-    }
-
-    private class KStreamWindowProcessor extends AbstractProcessor<K, V> {
-
-        private Window<K, V> window;
-
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-            this.window = windowSupplier.get();
-            this.window.init(context);
-        }
-
-        @Override
-        public void process(K key, V value) {
-            synchronized (this) {
-                window.put(key, value, context().timestamp());
-                context().forward(key, value);
-            }
-        }
-
-        @Override
-        public void close() {
-            window.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
deleted file mode 100644
index c71c11b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implements KStreamWindowed<K, V> {
-
-    private final WindowSupplier<K, V> windowSupplier;
-
-    public KStreamWindowedImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, WindowSupplier<K, V> windowSupplier) {
-        super(topology, name, sourceNodes);
-        this.windowSupplier = windowSupplier;
-    }
-
-    @Override
-    public <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> valueJoiner) {
-        String thisWindowName = this.windowSupplier.name();
-        String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowSupplier.name();
-        Set<String> thisSourceNodes = this.sourceNodes;
-        Set<String> otherSourceNodes = ((KStreamWindowedImpl<K, V1>) other).sourceNodes;
-
-        if (thisSourceNodes == null || otherSourceNodes == null)
-            throw new KafkaException("not joinable");
-
-        Set<String> allSourceNodes = new HashSet<>(sourceNodes);
-        allSourceNodes.addAll(((KStreamWindowedImpl<K, V1>) other).sourceNodes);
-
-        KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
-        KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, reverseJoiner(valueJoiner));
-        KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
-
-        String joinThisName = topology.newName(JOINTHIS_NAME);
-        String joinOtherName = topology.newName(JOINOTHER_NAME);
-        String joinMergeName = topology.newName(MERGE_NAME);
-
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.copartitionSources(allSourceNodes);
-
-        return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
index 41c725d..73814ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
@@ -39,7 +39,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
     private final Serdes serdes;
     private final Time time;
 
-    protected RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
+    public RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
         this.name = name;
         this.windowBefore = windowBefore;
         this.windowAfter = windowAfter;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1e775b8..108bf3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,10 +22,8 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.UnlimitedWindowDef;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -72,54 +70,38 @@ public class KStreamImplTest {
         });
 
         KStream<String, Integer>[] streams2 = stream2.branch(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return (value % 2) == 0;
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                },
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return true;
+                    }
                 }
-            },
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
-                }
-            }
         );
 
         KStream<String, Integer>[] streams3 = stream3.branch(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return (value % 2) == 0;
-                }
-            },
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                },
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return true;
+                    }
                 }
-            }
         );
 
-        KStream<String, Integer> stream4 = streams2[0].with(new UnlimitedWindowDef<String, Integer>("window"))
-            .join(streams3[0].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
-                @Override
-                public Integer apply(Integer value1, Integer value2) {
-                    return value1 + value2;
-                }
-            });
-
-        KStream<String, Integer> stream5 = streams2[1].with(new UnlimitedWindowDef<String, Integer>("window"))
-            .join(streams3[1].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
-                @Override
-                public Integer apply(Integer value1, Integer value2) {
-                    return value1 + value2;
-                }
-            });
-
-        stream4.to("topic-5");
+        streams2[0].to("topic-5");
 
-        stream5.through("topic-6").process(new MockProcessorSupplier<String, Integer>());
+        streams2[1].through("topic-6").process(new MockProcessorSupplier<String, Integer>());
 
         assertEquals(2 + // sources
             2 + // stream1
@@ -127,8 +109,6 @@ public class KStreamImplTest {
             1 + // stream3
             1 + 2 + // streams2
             1 + 2 + // streams3
-            2 + 3 + // stream4
-            2 + 3 + // stream5
             1 + // to
             2 + // through
             1, // process

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
deleted file mode 100644
index 12bed17..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.UnlimitedWindowDef;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamJoinTest {
-
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
-    private String dummyTopic = "dummyTopic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
-
-    private ValueMapper<String, String> valueMapper = new ValueMapper<String, String>() {
-        @Override
-        public String apply(String value) {
-            return "#" + value;
-        }
-    };
-
-    private ValueMapper<String, Iterable<String>> valueMapper2 = new ValueMapper<String, Iterable<String>>() {
-        @Override
-        public Iterable<String> apply(String value) {
-            return (Iterable<String>) Utils.mkSet(value);
-        }
-    };
-
-    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
-        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-            @Override
-            public KeyValue<Integer, String> apply(Integer key, String value) {
-                return KeyValue.pair(key, value);
-            }
-        };
-
-    KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>> keyValueMapper2 =
-        new KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>>() {
-            @Override
-            public KeyValue<Integer, Iterable<String>> apply(Integer key, String value) {
-                return KeyValue.pair(key, (Iterable<String>) Utils.mkSet(value));
-            }
-        };
-
-
-    @Test
-    public void testJoin() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-        KStream<Integer, String> stream1;
-        KStream<Integer, String> stream2;
-        KStream<Integer, String> dummyStream;
-        KStreamWindowed<Integer, String> windowed1;
-        KStreamWindowed<Integer, String> windowed2;
-        MockProcessorSupplier<Integer, String> processor;
-        String[] expected;
-
-        processor = new MockProcessorSupplier<>();
-        stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
-        stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-        dummyStream = builder.from(keyDeserializer, valDeserializer, dummyTopic);
-        windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));
-        windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2"));
-
-        windowed1.join(windowed2, joiner).process(processor);
-
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
-
-        assertEquals(1, copartitionGroups.size());
-        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        driver.setTime(0L);
-
-        // push two items to the main stream. the other stream's window is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-
-        assertEquals(0, processor.processed.size());
-
-        // push two items to the other stream. the main stream's window has two items
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-
-        assertEquals(2, processor.processed.size());
-
-        expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-
-        processor.processed.clear();
-
-        // push all items to the main stream. this should produce two items.
-
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-
-        assertEquals(2, processor.processed.size());
-
-        expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-
-        processor.processed.clear();
-
-        // there will be previous two items + all items in the main stream's window, thus two are duplicates.
-
-        // push all items to the other stream. this should produce 6 items
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-
-        assertEquals(6, processor.processed.size());
-
-        expected = new String[]{"0:X0+Y0", "0:X0+Y0", "1:X1+Y1", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-
-    @Test(expected = KafkaException.class)
-    public void testNotJoinable() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<Integer, String> stream1;
-        KStream<Integer, String> stream2;
-        KStreamWindowed<Integer, String> windowed1;
-        KStreamWindowed<Integer, String> windowed2;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream1 = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
-        stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-        windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));
-        windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2"));
-
-        windowed1.join(windowed2, joiner).process(processor);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
new file mode 100644
index 0000000..5a937af
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.JoinWindowSpec;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKStreamJoinTest {
+
+    private String topic1 = "topic1";
+    private String topic2 = "topic2";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    @Test
+    public void testJoin() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KStream<Integer, String> stream1;
+            KStream<Integer, String> stream2;
+            KStream<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100),
+                    keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+            joined.process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            // push two items to the primary stream. the other window is empty
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X1, 1:X1 }
+            //     w2 = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X1, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+            // push all four items to the primary stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+            // push all items to the other stream. this should produce six items.
+            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+
+            // push two items to the other stream. this should produce six item.
+            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testOuterJoin() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KStream<Integer, String> stream1;
+            KStream<Integer, String> stream2;
+            KStream<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.outerJoin(stream2, joiner, JoinWindowSpec.of("test").within(100),
+                    keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+            joined.process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            // push two items to the primary stream. the other window is empty.this should produce two items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X1, 1:X1 }
+            //     w2 = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X1, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+            // push all four items to the primary stream. this should produce four items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+
+            // push all items to the other stream. this should produce six items.
+            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+
+            // push two items to the other stream. this should produce six item.
+            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testWindowing() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            long time = 0L;
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KStream<Integer, String> stream1;
+            KStream<Integer, String> stream2;
+            KStream<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100),
+                    keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+            joined.process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(time);
+
+            // push two items to the primary stream. the other window is empty. this should produce no items.
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X1, 1:X1 }
+            //     w2 = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X1, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+            // clear logically
+            time = 1000L;
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.setTime(time + i);
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+            processor.checkAndClearResult();
+
+            // gradually expires items in w1
+            // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+
+            time = 1000 + 100L;
+            driver.setTime(time);
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("3:X3+YY3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // go back to the time before expiration
+
+            time = 1000L - 100L - 1L;
+            driver.setTime(time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+            // clear (logically)
+            time = 2000L;
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.setTime(time + i);
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // gradually expires items in w2
+            // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+
+            time = 2000L + 100L;
+            driver.setTime(time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // go back to the time before expiration
+
+            time = 2000L - 100L - 1L;
+            driver.setTime(time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+}


Mime
View raw message