kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [5/6] kafka git commit: KIP-28: Add a processor client for Kafka Streaming
Date Sat, 26 Sep 2015 00:24:24 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
new file mode 100644
index 0000000..a1456f6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.Iterator;
+
+public interface Window<K, V> extends StateStore {
+
+    void init(ProcessorContext context);
+
+    Iterator<V> find(K key, long timestamp);
+
+    Iterator<V> findAfter(K key, long timestamp);
+
+    Iterator<V> findBefore(K key, long timestamp);
+
+    void put(K key, V value, long timestamp);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
new file mode 100644
index 0000000..bbc5979
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface WindowDef<K, V> {
+
+    String name();
+
+    Window<K, V> instance();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
new file mode 100644
index 0000000..54d44f0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Iterator;
+
+public abstract class FilteredIterator<T, S> implements Iterator<T> {
+
+    private Iterator<S> inner;
+    private T nextValue = null;
+
+    public FilteredIterator(Iterator<S> inner) {
+        this.inner = inner;
+
+        findNext();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return nextValue != null;
+    }
+
+    @Override
+    public T next() {
+        T value = nextValue;
+        findNext();
+
+        return value;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    private void findNext() {
+        while (inner.hasNext()) {
+            S item = inner.next();
+            nextValue = filter(item);
+            if (nextValue != null) {
+                return;
+            }
+        }
+        nextValue = null;
+    }
+
+    protected abstract T filter(S item);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
new file mode 100644
index 0000000..6b661b4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.kstream.Predicate;
+
+class KStreamBranch<K, V> implements ProcessorDef {
+
+    private final Predicate<K, V>[] predicates;
+
+    @SuppressWarnings("unchecked")
+    public KStreamBranch(Predicate... predicates) {
+        this.predicates = predicates;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamBranchProcessor();
+    }
+
+    private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            for (int i = 0; i < predicates.length; i++) {
+                if (predicates[i].apply(key, value)) {
+                    // use forward with childIndex here and then break the loop
+                    // so that no record is going to be piped to multiple streams
+                    context().forward(key, value, i);
+                    break;
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
new file mode 100644
index 0000000..5444e70
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamFilter<K, V> implements ProcessorDef {
+
+    private final Predicate<K, V> predicate;
+    private final boolean filterOut;
+
+    public KStreamFilter(Predicate<K, V> predicate, boolean filterOut) {
+        this.predicate = predicate;
+        this.filterOut = filterOut;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamFilterProcessor();
+    }
+
+    private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            if (filterOut ^ predicate.apply(key, value)) {
+                context().forward(key, value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
new file mode 100644
index 0000000..410cfda
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef {
+
+    private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
+
+    KStreamFlatMap(KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamFlatMapProcessor();
+    }
+
+    private class KStreamFlatMapProcessor extends AbstractProcessor<K1, V1> {
+        @Override
+        public void process(K1 key, V1 value) {
+            for (KeyValue<K2, V2> newPair : mapper.apply(key, value)) {
+                context().forward(newPair.key, newPair.value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
new file mode 100644
index 0000000..edca421
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef {
+
+    private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
+
+    KStreamFlatMapValues(ValueMapper<V1, ? extends Iterable<V2>> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamFlatMapValuesProcessor();
+    }
+
+    private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K1, V1> {
+        @Override
+        public void process(K1 key, V1 value) {
+            Iterable<V2> newValues = mapper.apply(value);
+            for (V2 v : newValues) {
+                context().forward(key, v);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
new file mode 100644
index 0000000..6936648
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.kstream.KStreamWindowed;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.WindowDef;
+
+import java.lang.reflect.Array;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class KStreamImpl<K, V> implements KStream<K, V> {
+
+    private static final String FILTER_NAME = "KAFKA-FILTER-";
+
+    private static final String MAP_NAME = "KAFKA-MAP-";
+
+    private static final String MAPVALUES_NAME = "KAFKA-MAPVALUES-";
+
+    private static final String FLATMAP_NAME = "KAFKA-FLATMAP-";
+
+    private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-";
+
+    private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-";
+
+    private static final String BRANCH_NAME = "KAFKA-BRANCH-";
+
+    private static final String BRANCHCHILD_NAME = "KAFKA-BRANCHCHILD-";
+
+    private static final String WINDOWED_NAME = "KAFKA-WINDOWED-";
+
+    private static final String SINK_NAME = "KAFKA-SINK-";
+
+    public static final String JOINTHIS_NAME = "KAFKA-JOINTHIS-";
+
+    public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-";
+
+    public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-";
+
+    public static final String SOURCE_NAME = "KAFKA-SOURCE-";
+
+    public static final AtomicInteger INDEX = new AtomicInteger(1);
+
+    protected final TopologyBuilder topology;
+    protected final String name;
+
+    public KStreamImpl(TopologyBuilder topology, String name) {
+        this.topology = topology;
+        this.name = name;
+    }
+
+    @Override
+    public KStream<K, V> filter(Predicate<K, V> predicate) {
+        String name = FILTER_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+
+    @Override
+    public KStream<K, V> filterOut(final Predicate<K, V> predicate) {
+        String name = FILTER_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+
+    @Override
+    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
+        String name = MAP_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+
+    @Override
+    public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+        String name = MAPVALUES_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+
+    @Override
+    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
+        String name = FLATMAP_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+
+    @Override
+    public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
+        String name = FLATMAPVALUES_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+
+    @Override
+    public KStreamWindowed<K, V> with(WindowDef<K, V> window) {
+        String name = WINDOWED_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamWindow<>(window), this.name);
+
+        return new KStreamWindowedImpl<>(topology, name, window);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
+        String branchName = BRANCH_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
+
+        KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+        for (int i = 0; i < predicates.length; i++) {
+            String childName = BRANCHCHILD_NAME + INDEX.getAndIncrement();
+
+            topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
+
+            branchChildren[i] = new KStreamImpl<>(topology, childName);
+        }
+
+        return branchChildren;
+    }
+
+    @Override
+    public <K1, V1> KStream<K1, V1> through(String topic,
+                                            Serializer<K> keySerializer,
+                                            Serializer<V> valSerializer,
+                                            Deserializer<K1> keyDeserializer,
+                                            Deserializer<V1> valDeserializer) {
+        String sendName = SINK_NAME + INDEX.getAndIncrement();
+
+        topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+
+        String sourceName = SOURCE_NAME + INDEX.getAndIncrement();
+
+        topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
+
+        return new KStreamImpl<>(topology, sourceName);
+    }
+
+    @Override
+    public <K1, V1> KStream<K1, V1> through(String topic) {
+        return through(topic, (Serializer<K>) null, (Serializer<V>) null, (Deserializer<K1>) null, (Deserializer<V1>) null);
+    }
+
+    @Override
+    public void to(String topic) {
+        String name = SINK_NAME + INDEX.getAndIncrement();
+
+        topology.addSink(name, topic, this.name);
+    }
+
+    @Override
+    public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+        String name = SINK_NAME + INDEX.getAndIncrement();
+
+        topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+    }
+
+    @Override
+    public <K1, V1> KStream<K1, V1> process(final ProcessorDef processorDef) {
+        String name = PROCESSOR_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, processorDef, this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
new file mode 100644
index 0000000..4003d29
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+import java.util.Iterator;
+
+class KStreamJoin<K, V, V1, V2> implements ProcessorDef {
+
+    private static abstract class Finder<K, T> {
+        abstract Iterator<T> find(K key, long timestamp);
+    }
+
+    private final String windowName;
+    private final ValueJoiner<V1, V2, V> joiner;
+
+    KStreamJoin(String windowName, ValueJoiner<V1, V2, V> joiner) {
+        this.windowName = windowName;
+        this.joiner = joiner;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamJoinProcessor(windowName);
+    }
+
+    private class KStreamJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private final String windowName;
+        protected Finder<K, V2> finder;
+
+        public KStreamJoinProcessor(String windowName) {
+            this.windowName = windowName;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            // check if these two streams are joinable
+            if (!context.joinable())
+                throw new IllegalStateException("Streams are not joinable.");
+
+            final Window<K, V2> window = (Window<K, V2>) context.getStateStore(windowName);
+
+            this.finder = new Finder<K, V2>() {
+                Iterator<V2> find(K key, long timestamp) {
+                    return window.find(key, timestamp);
+                }
+            };
+        }
+
+        @Override
+        public void process(K key, V1 value) {
+            long timestamp = context().timestamp();
+            Iterator<V2> iter = finder.find(key, timestamp);
+            if (iter != null) {
+                while (iter.hasNext()) {
+                    context().forward(key, joiner.apply(value, iter.next()));
+                }
+            }
+        }
+    }
+
+    public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
+        return new ValueJoiner<T2, T1, R>() {
+            @Override
+            public R apply(T2 value2, T1 value1) {
+                return joiner.apply(value1, value2);
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
new file mode 100644
index 0000000..4d31348
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamMap<K1, V1, K2, V2> implements ProcessorDef {
+
+    private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+
+    public KStreamMap(KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamMapProcessor();
+    }
+
+    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+        @Override
+        public void process(K1 key, V1 value) {
+            KeyValue<K2, V2> newPair = mapper.apply(key, value);
+            context().forward(newPair.key, newPair.value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
new file mode 100644
index 0000000..dac6550
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamMapValues<K1, V1, V2> implements ProcessorDef {
+
+    private final ValueMapper<V1, V2> mapper;
+
+    public KStreamMapValues(ValueMapper<V1, V2> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamMapProcessor();
+    }
+
+    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+        @Override
+        public void process(K1 key, V1 value) {
+            V2 newValue = mapper.apply(value);
+            context().forward(key, newValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
new file mode 100644
index 0000000..ea39550
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamPassThrough<K, V> implements ProcessorDef {
+
+    @Override
+    public Processor instance() {
+        return new KStreamPassThroughProcessor();
+    }
+
+    public class KStreamPassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            context().forward(key, value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
new file mode 100644
index 0000000..6ebcbe1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.kstream.WindowDef;
+
+public class KStreamWindow<K, V> implements ProcessorDef {
+
+    private final WindowDef<K, V> windowDef;
+
+    KStreamWindow(WindowDef<K, V> windowDef) {
+        this.windowDef = windowDef;
+    }
+
+    public WindowDef<K, V> window() {
+        return windowDef;
+    }
+
+    @Override
+    public Processor instance() {
+        return new KStreamWindowProcessor();
+    }
+
+    private class KStreamWindowProcessor extends AbstractProcessor<K, V> {
+
+        private Window<K, V> window;
+
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            this.window = windowDef.instance();
+            this.window.init(context);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            synchronized (this) {
+                window.put(key, value, context().timestamp());
+                context().forward(key, value);
+            }
+        }
+
+        @Override
+        public void close() {
+            window.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
new file mode 100644
index 0000000..a208af6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamWindowed;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+
+public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implements KStreamWindowed<K, V> {
+
+    private final WindowDef<K, V> windowDef;
+
+    public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowDef<K, V> windowDef) {
+        super(topology, name);
+        this.windowDef = windowDef;
+    }
+
+    @Override
+    public <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> valueJoiner) {
+        String thisWindowName = this.windowDef.name();
+        String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowDef.name();
+
+        KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
+        KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner));
+        KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
+
+        String joinThisName = JOINTHIS_NAME + INDEX.getAndIncrement();
+        String joinOtherName = JOINOTHER_NAME + INDEX.getAndIncrement();
+        String joinMergeName = JOINMERGE_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(joinThisName, joinThis, this.name);
+        topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);
+        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+
+        return new KStreamImpl<>(topology, joinMergeName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
new file mode 100644
index 0000000..b54bcc9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.processor.internals.Stamped;
+
+import java.util.Iterator;
+
+public class WindowSupport {
+
+    public static class ValueList<V> {
+        Value<V> head = null;
+        Value<V> tail = null;
+        Value<V> dirty = null;
+
+        public void add(int slotNum, V value, long timestamp) {
+            Value<V> v = new Value<>(slotNum, value, timestamp);
+            if (tail != null) {
+                tail.next = v;
+            } else {
+                head = v;
+            }
+            tail = v;
+            if (dirty == null) dirty = v;
+        }
+
+        public Value<V> first() {
+            return head;
+        }
+
+        public void removeFirst() {
+            if (head != null) {
+                if (head == tail) tail = null;
+                head = head.next;
+            }
+        }
+
+        public boolean isEmpty() {
+            return head == null;
+        }
+
+        public boolean hasDirtyValues() {
+            return dirty != null;
+        }
+
+        public void clearDirtyValues() {
+            dirty = null;
+        }
+
+        public Iterator<Value<V>> iterator() {
+            return new ValueListIterator<V>(head);
+        }
+
+        public Iterator<Value<V>> dirtyValueIterator() {
+            return new ValueListIterator<V>(dirty);
+        }
+
+    }
+
+    private static class ValueListIterator<V> implements Iterator<Value<V>> {
+
+        Value<V> ptr;
+
+        ValueListIterator(Value<V> start) {
+            ptr = start;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return ptr != null;
+        }
+
+        @Override
+        public Value<V> next() {
+            Value<V> value = ptr;
+            if (value != null) ptr = value.next;
+            return value;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    public static class Value<V> extends Stamped<V> {
+        public final int slotNum;
+        private Value<V> next = null;
+
+        Value(int slotNum, V value, long timestamp) {
+            super(value, timestamp);
+            this.slotNum = slotNum;
+        }
+    }
+
+
+    public static long getLong(byte[] bytes, int offset) {
+        long value = 0;
+        for (int i = 0; i < 8; i++) {
+            value = (value << 8) | bytes[offset + i];
+        }
+        return value;
+    }
+
+    public static int getInt(byte[] bytes, int offset) {
+        int value = 0;
+        for (int i = 0; i < 4; i++) {
+            value = (value << 8) | bytes[offset + i];
+        }
+        return value;
+    }
+
+    public static int putLong(byte[] bytes, int offset, long value) {
+        for (int i = 7; i >= 0; i--) {
+            bytes[offset + i] = (byte) (value & 0xFF);
+            value = value >> 8;
+        }
+        return 8;
+    }
+
+    public static int putInt(byte[] bytes, int offset, int value) {
+        for (int i = 3; i >= 0; i--) {
+            bytes[offset + i] = (byte) (value & 0xFF);
+            value = value >> 8;
+        }
+        return 4;
+    }
+
+    public static int puts(byte[] bytes, int offset, byte[] value) {
+        offset += putInt(bytes, offset, value.length);
+        System.arraycopy(bytes, offset, value, 0, value.length);
+        return 4 + value.length;
+    }
+
+
+    public static <T> T deserialize(byte[] bytes, int offset, int length, String topic, Deserializer<T> deserializer) {
+        byte[] buf = new byte[length];
+        System.arraycopy(bytes, offset, buf, 0, length);
+        return deserializer.deserialize(topic, buf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
new file mode 100644
index 0000000..01d0024
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
+ * implementations of {@link #punctuate(long)} and {@link #close()}.
+ * 
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
+
+    private ProcessorContext context;
+
+    protected AbstractProcessor() {
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        this.context = context;
+    }
+
+    /**
+     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * during {@link #init(ProcessorContext) initialization}.
+     * <p>
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     * </p>
+     * 
+     * @param streamTime the stream time when this method is being called
+     */
+    @Override
+    public void punctuate(long streamTime) {
+        // do nothing
+    }
+
+    /**
+     * Close this processor and clean up any resources.
+     * <p>
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     * </p>
+     */
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+    /**
+     * Get the processor's context set during {@link #init(ProcessorContext) initialization}.
+     * 
+     * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
+     */
+    protected final ProcessorContext context() {
+        return this.context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
new file mode 100644
index 0000000..3cade3a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+/**
+ * A processor of messages.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public interface Processor<K, V> {
+
+    /**
+     * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
+     * that contains it is initialized.
+     * <p>
+     * If this processor is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+     * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
+     * 
+     * @param context the context; may not be null
+     */
+    void init(ProcessorContext context);
+
+    /**
+     * Process the message with the given key and value.
+     * 
+     * @param key the key for the message
+     * @param value the value for the message
+     */
+    void process(K key, V value);
+
+    /**
+     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * during {@link #init(ProcessorContext) initialization}.
+     * 
+     * @param timestamp the stream time when this method is being called
+     */
+    void punctuate(long timestamp);
+
+    /**
+     * Close this processor and clean up any resources.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
new file mode 100644
index 0000000..6b32b83
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.File;
+
+public interface ProcessorContext {
+
+    /**
+     * Returns the partition group id
+     *
+     * @return partition group id
+     */
+    int id();
+
+    /**
+     * Returns the key serializer
+     *
+     * @return the key serializer
+     */
+    Serializer<?> keySerializer();
+
+    /**
+     * Returns the value serializer
+     *
+     * @return the value serializer
+     */
+    Serializer<?> valueSerializer();
+
+    /**
+     * Returns the key deserializer
+     *
+     * @return the key deserializer
+     */
+    Deserializer<?> keyDeserializer();
+
+    /**
+     * Returns the value deserializer
+     *
+     * @return the value deserializer
+     */
+    Deserializer<?> valueDeserializer();
+
+    /**
+     * Returns the state directory for the partition.
+     *
+     * @return the state directory
+     */
+    File stateDir();
+
+    /**
+     * Returns Metrics instance
+     *
+     * @return Metrics
+     */
+    Metrics metrics();
+
+    /**
+     * Check if this process's incoming streams are joinable
+     */
+    boolean joinable();
+
+    /**
+     * Registers and possibly restores the specified storage engine.
+     *
+     * @param store the storage engine
+     */
+    void register(StateStore store, RestoreFunc restoreFunc);
+
+    StateStore getStateStore(String name);
+
+    void schedule(long interval);
+
+    <K, V> void forward(K key, V value);
+
+    <K, V> void forward(K key, V value, int childIndex);
+
+    void commit();
+
+    String topic();
+
+    int partition();
+
+    long offset();
+
+    long timestamp();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
new file mode 100644
index 0000000..99f0299
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public interface ProcessorDef {
+
+    Processor instance();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java b/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
new file mode 100644
index 0000000..883147e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+/**
+ * Restoration logic for log-backed state stores upon restart,
+ * it takes one record at a time from the logs to apply to the restoring state.
+ */
+public interface RestoreFunc {
+
+    void apply(byte[] key, byte[] value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
new file mode 100644
index 0000000..38afe9b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+/**
+ * A storage engine for managing state maintained by a stream processor.
+ *
+ * <p>
+ * This interface does not specify any query capabilities, which, of course,
+ * would be query engine specific. Instead it just specifies the minimum
+ * functionality required to reload a storage engine from its changelog as well
+ * as basic lifecycle management.
+ * </p>
+ */
+public interface StateStore {
+
+    /**
+     * The name of this store.
+     * @return the storage name
+     */
+    String name();
+
+    /**
+     * Flush any cached data
+     */
+    void flush();
+
+    /**
+     * Close the storage engine
+     */
+    void close();
+
+    /**
+     * If the storage is persistent
+     */
+    boolean persistent();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
new file mode 100644
index 0000000..62098f2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * An interface that allows the KStream framework to extract a timestamp from a key-value pair
+ */
+public interface TimestampExtractor {
+
+    /**
+     * Extracts a timestamp from a message
+     *
+     * @param record ConsumerRecord
+     * @return timestamp
+     */
+    long extract(ConsumerRecord<Object, Object> record);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
new file mode 100644
index 0000000..a254c13
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
+ * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
+ * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
+ * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
+ * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
+ * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
+ * that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
+ */
+public class TopologyBuilder {
+
+    // list of node factories in a topological order
+    private ArrayList<NodeFactory> nodeFactories = new ArrayList<>();
+
+    private Set<String> nodeNames = new HashSet<>();
+    private Set<String> sourceTopicNames = new HashSet<>();
+
+    private interface NodeFactory {
+        ProcessorNode build();
+    }
+
+    private class ProcessorNodeFactory implements NodeFactory {
+        public final String[] parents;
+        private final String name;
+        private final ProcessorDef definition;
+
+        public ProcessorNodeFactory(String name, String[] parents, ProcessorDef definition) {
+            this.name = name;
+            this.parents = parents.clone();
+            this.definition = definition;
+        }
+
+        @Override
+        public ProcessorNode build() {
+            Processor processor = definition.instance();
+            return new ProcessorNode(name, processor);
+        }
+    }
+
+    private class SourceNodeFactory implements NodeFactory {
+        public final String[] topics;
+        private final String name;
+        private Deserializer keyDeserializer;
+        private Deserializer valDeserializer;
+
+        private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
+            this.name = name;
+            this.topics = topics.clone();
+            this.keyDeserializer = keyDeserializer;
+            this.valDeserializer = valDeserializer;
+        }
+
+        @Override
+        public ProcessorNode build() {
+            return new SourceNode(name, keyDeserializer, valDeserializer);
+        }
+    }
+
+    private class SinkNodeFactory implements NodeFactory {
+        public final String[] parents;
+        public final String topic;
+        private final String name;
+        private Serializer keySerializer;
+        private Serializer valSerializer;
+
+        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
+            this.name = name;
+            this.parents = parents.clone();
+            this.topic = topic;
+            this.keySerializer = keySerializer;
+            this.valSerializer = valSerializer;
+        }
+        @Override
+        public ProcessorNode build() {
+            return new SinkNode(name, topic, keySerializer, valSerializer);
+        }
+    }
+
+    /**
+     * Create a new builder.
+     */
+    public TopologyBuilder() {}
+
+    /**
+     * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
+     * The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
+     * {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamingConfig streaming configuration}.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}.
+     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addSource(String name, String... topics) {
+        return addSource(name, (Deserializer) null, (Deserializer) null, topics);
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
+     * The sink will use the specified key and value deserializers.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}.
+     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
+     * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+     * {@link StreamingConfig streaming configuration}
+     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
+     * should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamingConfig streaming configuration}
+     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+        if (nodeNames.contains(name))
+            throw new TopologyException("Processor " + name + " is already added.");
+
+        for (String topic : topics) {
+            if (sourceTopicNames.contains(topic))
+                throw new TopologyException("Topic " + topic + " has already been registered by another source.");
+
+            sourceTopicNames.add(topic);
+        }
+
+        nodeNames.add(name);
+        nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+     * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamingConfig streaming configuration}.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its messages
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
+        return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
+    }
+
+    /**
+     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the specified key and value serializers.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its messages
+     * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
+     * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * {@link StreamingConfig streaming configuration}
+     * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
+     * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamingConfig streaming configuration}
+     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * and write to its topic
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
+        if (nodeNames.contains(name))
+            throw new TopologyException("Processor " + name + " is already added.");
+
+        if (parentNames != null) {
+            for (String parent : parentNames) {
+                if (parent.equals(name)) {
+                    throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
+                }
+                if (!nodeNames.contains(parent)) {
+                    throw new TopologyException("Parent processor " + parent + " is not added yet.");
+                }
+            }
+        }
+
+        nodeNames.add(name);
+        nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+        return this;
+    }
+
+    /**
+     * Add a new processor node that receives and processes messages output by one or more parent source or processor node.
+     * Any new messages output by this processor will be forwarded to its child processor or sink nodes.
+     * @param name the unique name of the processor node
+     * @param definition the supplier used to obtain this node's {@link Processor} instance
+     * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
+     * and process
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addProcessor(String name, ProcessorDef definition, String... parentNames) {
+        if (nodeNames.contains(name))
+            throw new TopologyException("Processor " + name + " is already added.");
+
+        if (parentNames != null) {
+            for (String parent : parentNames) {
+                if (parent.equals(name)) {
+                    throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
+                }
+                if (!nodeNames.contains(parent)) {
+                    throw new TopologyException("Parent processor " + parent + " is not added yet.");
+                }
+            }
+        }
+
+        nodeNames.add(name);
+        nodeFactories.add(new ProcessorNodeFactory(name, parentNames, definition));
+        return this;
+    }
+
+    /**
+     * Build the topology. This is typically called automatically when passing this builder into the
+     * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
+     *
+     * @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
+     */
+    @SuppressWarnings("unchecked")
+    public ProcessorTopology build() {
+        List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
+        Map<String, ProcessorNode> processorMap = new HashMap<>();
+        Map<String, SourceNode> topicSourceMap = new HashMap<>();
+
+        try {
+            // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
+            for (NodeFactory factory : nodeFactories) {
+                ProcessorNode node = factory.build();
+                processorNodes.add(node);
+                processorMap.put(node.name(), node);
+
+                if (factory instanceof ProcessorNodeFactory) {
+                    for (String parent : ((ProcessorNodeFactory) factory).parents) {
+                        processorMap.get(parent).addChild(node);
+                    }
+                } else if (factory instanceof SourceNodeFactory) {
+                    for (String topic : ((SourceNodeFactory) factory).topics) {
+                        topicSourceMap.put(topic, (SourceNode) node);
+                    }
+                } else if (factory instanceof SinkNodeFactory) {
+                    for (String parent : ((SinkNodeFactory) factory).parents) {
+                        processorMap.get(parent).addChild(node);
+                    }
+                } else {
+                    throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
+                }
+            }
+        } catch (Exception e) {
+            throw new KafkaException("ProcessorNode construction failed: this should not happen.");
+        }
+
+        return new ProcessorTopology(processorNodes, topicSourceMap);
+    }
+
+    /**
+     * Get the names of topics that are to be consumed by the source nodes created by this builder.
+     * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
+     */
+    public Set<String> sourceTopics() {
+        return Collections.unmodifiableSet(sourceTopicNames);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
new file mode 100644
index 0000000..99d1405
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.KafkaException;
+
+public class TopologyException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TopologyException(String message) {
+        super(message);
+    }
+
+    public TopologyException(String name, Object value) {
+        this(name, value, null);
+    }
+
+    public TopologyException(String name, Object value, String message) {
+        super("Invalid topology building" + (message == null ? "" : ": " + message));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
new file mode 100644
index 0000000..717df2c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.LinkedList;
+
+/**
+ * MinTimestampTracker implements {@link TimestampTracker} that maintains the min
+ * timestamp of the maintained stamped elements.
+ */
+public class MinTimestampTracker<E> implements TimestampTracker<E> {
+
+    private final LinkedList<Stamped<E>> descendingSubsequence = new LinkedList<>();
+
+    // in the case that incoming traffic is very small, the records maybe put and polled
+    // within a single iteration, in this case we need to remember the last polled
+    // record's timestamp
+    private long lastKnownTime = NOT_KNOWN;
+
+    public void addElement(Stamped<E> elem) {
+        if (elem == null) throw new NullPointerException();
+
+        Stamped<E> minElem = descendingSubsequence.peekLast();
+        while (minElem != null && minElem.timestamp >= elem.timestamp) {
+            descendingSubsequence.removeLast();
+            minElem = descendingSubsequence.peekLast();
+        }
+        descendingSubsequence.offerLast(elem);
+    }
+
+    public void removeElement(Stamped<E> elem) {
+        if (elem != null && descendingSubsequence.peekFirst() == elem)
+            descendingSubsequence.removeFirst();
+
+        if (descendingSubsequence.isEmpty())
+            lastKnownTime = elem.timestamp;
+    }
+
+    public int size() {
+        return descendingSubsequence.size();
+    }
+
+    public long get() {
+        Stamped<E> stamped = descendingSubsequence.peekFirst();
+
+        if (stamped == null)
+            return lastKnownTime;
+        else
+            return stamped.timestamp;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
new file mode 100644
index 0000000..44a6c5c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * A PartitionGroup is composed from a set of partitions. It also maintains the timestamp of this
+ * group, hence the associated task as the min timestamp across all partitions in the group.
+ */
+public class PartitionGroup {
+
+    private final Map<TopicPartition, RecordQueue> partitionQueues;
+
+    private final PriorityQueue<RecordQueue> queuesByTime;
+
+    private final TimestampExtractor timestampExtractor;
+
+    public static class RecordInfo {
+        public RecordQueue queue;
+
+        public ProcessorNode node() {
+            return queue.source();
+        }
+
+        public TopicPartition partition() {
+            return queue.partition();
+        }
+    }
+
+    // since task is thread-safe, we do not need to synchronize on local variables
+    private int totalBuffered;
+
+    public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues, TimestampExtractor timestampExtractor) {
+        this.queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
+
+            @Override
+            public int compare(RecordQueue queue1, RecordQueue queue2) {
+                long time1 = queue1.timestamp();
+                long time2 = queue2.timestamp();
+
+                if (time1 < time2) return -1;
+                if (time1 > time2) return 1;
+                return 0;
+            }
+        });
+
+        this.partitionQueues = partitionQueues;
+
+        this.timestampExtractor = timestampExtractor;
+
+        this.totalBuffered = 0;
+    }
+
+    /**
+     * Get the next record and queue
+     *
+     * @return StampedRecord
+     */
+    public StampedRecord nextRecord(RecordInfo info) {
+        StampedRecord record = null;
+
+        RecordQueue queue = queuesByTime.poll();
+        if (queue != null) {
+            // get the first record from this queue.
+            record = queue.poll();
+
+            if (queue.size() > 0) {
+                queuesByTime.offer(queue);
+            }
+        }
+        info.queue = queue;
+
+        if (record != null) totalBuffered--;
+
+        return record;
+    }
+
+    /**
+     * Adds raw records to this partition group
+     *
+     * @param partition the partition
+     * @param rawRecords  the raw records
+     * @return the queue size for the partition
+     */
+    public int addRawRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
+        RecordQueue recordQueue = partitionQueues.get(partition);
+
+        int oldSize = recordQueue.size();
+        int newSize = recordQueue.addRawRecords(rawRecords, timestampExtractor);
+
+        // add this record queue to be considered for processing in the future if it was empty before
+        if (oldSize == 0 && newSize > 0) {
+            queuesByTime.offer(recordQueue);
+        }
+
+        totalBuffered += newSize - oldSize;
+
+        return newSize;
+    }
+
+    public Set<TopicPartition> partitions() {
+        return Collections.unmodifiableSet(partitionQueues.keySet());
+    }
+
+    /**
+     * Return the timestamp of this partition group as the smallest
+     * partition timestamp among all its partitions
+     */
+    public long timestamp() {
+        if (queuesByTime.isEmpty()) {
+            // if there is no data in all partitions, return the smallest of their last known times
+            long timestamp = Long.MAX_VALUE;
+            for (RecordQueue queue : partitionQueues.values()) {
+                if (timestamp > queue.timestamp())
+                    timestamp = queue.timestamp();
+            }
+            return timestamp;
+        } else {
+            return queuesByTime.peek().timestamp();
+        }
+    }
+
+    public int numBuffered(TopicPartition partition) {
+        RecordQueue recordQueue = partitionQueues.get(partition);
+
+        if (recordQueue == null)
+            throw new KafkaException("Record's partition does not belong to this partition-group.");
+
+        return recordQueue.size();
+    }
+
+    public int numBuffered() {
+        return totalBuffered;
+    }
+
+    public void close() {
+        queuesByTime.clear();
+        partitionQueues.clear();
+    }
+}


Mime
View raw message