kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-5922: Add SessionWindowedKStream
Date Thu, 21 Sep 2017 08:10:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b12ba240e -> a2da064cb


http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
new file mode 100644
index 0000000..cc6ca05
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements TimeWindowedKStream<K, V> {
+
+    private final Windows<W> windows;
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    TimeWindowedKStreamImpl(final Windows<W> windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> sourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde,
+                            final boolean repartitionRequired) {
+        super(builder, name, sourceNodes);
+        Objects.requireNonNull(windows, "windows can't be null");
+        this.valSerde = valSerde;
+        this.keySerde = keySerde;
+        this.windows = windows;
+        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return doAggregate(
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator,
+                Serdes.Long());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
+    }
+
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        return doAggregate(initializer, aggregator, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
+                                                     final Aggregator<? super K, ? super V, VR> aggregator,
+                                                     final Serde<VR> serde) {
+        final String storeName = builder.newStoreName(AGGREGATE_NAME);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
+                                                                AGGREGATE_NAME,
+                                                                windowStoreBuilder(storeName, serde),
+                                                                false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
+                                                                                             materializedInternal.storeName(),
+                                                                                             initializer,
+                                                                                             aggregator),
+                                                                AGGREGATE_NAME,
+                                                                materialize(materializedInternal),
+                                                                true);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        final String storeName = builder.newStoreName(REDUCE_NAME);
+        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
+                                                               REDUCE_NAME,
+                                                               windowStoreBuilder(storeName, valSerde),
+                                                               true);
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+
+        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
+                                                               REDUCE_NAME,
+                                                               materialize(materializedInternal),
+                                                               false);
+    }
+
+    private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            supplier = Stores.persistentWindowStore(materialized.storeName(),
+                                                    windows.maintainMs(),
+                                                    windows.segments,
+                                                    windows.size(),
+                                                    false);
+        }
+        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
+                                                                                   materialized.keySerde(),
+                                                                                   materialized.valueSerde());
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }
+        return builder;
+    }
+
+
+    private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+        return Stores.windowStoreBuilder(
+                Stores.persistentWindowStore(
+                        storeName,
+                        windows.maintainMs(),
+                        windows.segments,
+                        windows.size(),
+                        false),
+                keySerde,
+                aggValueSerde).withCachingEnabled();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
deleted file mode 100644
index 3992a79..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
-import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.util.Objects;
-import java.util.Set;
-
-import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
-import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
-
-public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> {
-
-    private final Windows<W> windows;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
-    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
-
-    WindowedKStreamImpl(final Windows<W> windows,
-                        final InternalStreamsBuilder builder,
-                        final Set<String> sourceNodes,
-                        final String name,
-                        final Serde<K> keySerde,
-                        final Serde<V> valSerde,
-                        final boolean repartitionRequired) {
-        super(builder, name, sourceNodes);
-        Objects.requireNonNull(windows, "windows can't be null");
-        this.valSerde = valSerde;
-        this.keySerde = keySerde;
-        this.windows = windows;
-        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
-    }
-
-    @Override
-    public KTable<Windowed<K>, Long> count() {
-        return doAggregate(
-                aggregateBuilder.countInitializer,
-                aggregateBuilder.countAggregator,
-                Serdes.Long());
-    }
-
-    @Override
-    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
-        Objects.requireNonNull(materialized, "materialized can't be null");
-        return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
-    }
-
-
-    @Override
-    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
-        Objects.requireNonNull(initializer, "initializer can't be null");
-        Objects.requireNonNull(aggregator, "aggregator can't be null");
-        return doAggregate(initializer, aggregator, null);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
-                                                     final Aggregator<? super K, ? super V, VR> aggregator,
-                                                     final Serde<VR> serde) {
-        final String storeName = builder.newStoreName(AGGREGATE_NAME);
-        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
-                                                                AGGREGATE_NAME,
-                                                                windowStoreBuilder(storeName, serde),
-                                                                false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                                  final Aggregator<? super K, ? super V, VR> aggregator,
-                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
-        Objects.requireNonNull(initializer, "initializer can't be null");
-        Objects.requireNonNull(aggregator, "aggregator can't be null");
-        Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
-                                                                                             materializedInternal.storeName(),
-                                                                                             initializer,
-                                                                                             aggregator),
-                                                                AGGREGATE_NAME,
-                                                                materialize(materializedInternal),
-                                                                true);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
-        Objects.requireNonNull(reducer, "reducer can't be null");
-        final String storeName = builder.newStoreName(REDUCE_NAME);
-        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
-                                                               REDUCE_NAME,
-                                                               windowStoreBuilder(storeName, valSerde),
-                                                               true);
-
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
-        Objects.requireNonNull(reducer, "reducer can't be null");
-        Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-
-        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
-                                                               REDUCE_NAME,
-                                                               materialize(materializedInternal),
-                                                               false);
-    }
-
-    private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
-        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
-        if (supplier == null) {
-            supplier = Stores.persistentWindowStore(materialized.storeName(),
-                                                    windows.maintainMs(),
-                                                    windows.segments,
-                                                    windows.size(),
-                                                    false);
-        }
-        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
-                                                                                   materialized.keySerde(),
-                                                                                   materialized.valueSerde());
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-
-        if (materialized.cachingEnabled()) {
-            builder.withCachingEnabled();
-        }
-        return builder;
-    }
-
-
-    private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
-        return Stores.windowStoreBuilder(
-                Stores.persistentWindowStore(
-                        storeName,
-                        windows.maintainMs(),
-                        windows.segments,
-                        windows.size(),
-                        false),
-                keySerde,
-                aggValueSerde).withCachingEnabled();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 81e8ef7..350facf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -521,7 +521,7 @@ public class KStreamAggregationIntegrationTest {
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                .count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
+                .count(SessionWindows.with(sessionGap).until(maintainMillis))
                 .toStream()
                 .foreach(new ForeachAction<Windowed<String>, Long>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index efa027c..c8e011e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -257,7 +257,6 @@ public class KGroupedStreamImplTest {
         });
 
         doAggregateSessionWindows(results);
-        assertNull(table.queryableStoreName());
     }
 
     private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
new file mode 100644
index 0000000..042f0f1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class SessionWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final Merger<String, String> sessionMerger = new Merger<String, String>() {
+        @Override
+        public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+            return aggOne + "+" + aggTwo;
+        }
+    };
+    private SessionWindowedKStream<String, String> stream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+        this.stream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SessionWindows.with(500));
+    }
+
+    @Test
+    public void shouldCountSessionWindowed() {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        stream.count()
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L));
+    }
+
+    @Test
+    public void shouldReduceWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        stream.reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2"));
+        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1"));
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3"));
+    }
+
+    @Test
+    public void shouldAggregateSessionWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger)
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+        processData();
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2"));
+        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1"));
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeCount() {
+        stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store")
+                             .withKeySerde(Serdes.String())
+                             .withValueSerde(Serdes.Long()));
+
+        processData();
+        final SessionStore<String, Long> store = (SessionStore<String, Long>) driver.allStateStores().get("count-store");
+        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2"));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeReduced() {
+        stream.reduce(MockReducer.STRING_ADDER,
+                      Materialized.<String, String, SessionStore<Bytes, byte[]>>as("reduced")
+                              .withKeySerde(Serdes.String())
+                              .withValueSerde(Serdes.String()));
+
+        processData();
+        final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("reduced");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeAggregated() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated")
+                                 .withKeySerde(Serdes.String())
+                                 .withValueSerde(Serdes.String()));
+
+        processData();
+        final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("aggregated");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1"))));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        stream.aggregate(null, MockAggregator.TOSTRING_ADDER, sessionMerger);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT, null, sessionMerger);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+        stream.reduce(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+        stream.aggregate(null,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         null,
+                         sessionMerger,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         null,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger,
+                         (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+        stream.reduce(null,
+                      Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+        stream.reduce(MockReducer.STRING_ADDER,
+                      null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+        stream.count(null);
+    }
+
+    private void processData() {
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "1", "2");
+        driver.setTime(600);
+        driver.process(TOPIC, "1", "3");
+        driver.process(TOPIC, "2", "1");
+        driver.flushState();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
new file mode 100644
index 0000000..93bcf33
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TimeWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private TimeWindowedKStream<String, String> windowedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+        windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+                .windowedBy(TimeWindows.of(500L));
+    }
+
+    @Test
+    public void shouldCountWindowed() {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        windowedStream.count()
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
+    }
+
+
+    @Test
+    public void shouldReduceWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        windowedStream.reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
+    }
+
+    @Test
+    public void shouldAggregateWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER
+        )
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeCount() {
+        windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
+                                     .withKeySerde(Serdes.String())
+                                     .withValueSerde(Serdes.Long()));
+
+        processData();
+        final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store");
+        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeReduced() {
+        windowedStream.reduce(MockReducer.STRING_ADDER,
+                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
+                                      .withKeySerde(Serdes.String())
+                                      .withValueSerde(Serdes.String()));
+
+        processData();
+        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeAggregated() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+                                         .withKeySerde(Serdes.String())
+                                         .withValueSerde(Serdes.String()));
+
+        processData();
+        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+        windowedStream.reduce(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+        windowedStream.aggregate(null,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 null,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+        windowedStream.reduce(null,
+                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+        windowedStream.reduce(MockReducer.STRING_ADDER,
+                              null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+        windowedStream.count(null);
+    }
+
+    private void processData() {
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "1", "2");
+        driver.setTime(500);
+        driver.process(TOPIC, "1", "3");
+        driver.process(TOPIC, "2", "1");
+        driver.flushState();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
deleted file mode 100644
index f5de96f..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.Consumed;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Serialized;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockAggregator;
-import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-public class WindowedKStreamImplTest {
-
-    private static final String TOPIC = "input";
-    private final StreamsBuilder builder = new StreamsBuilder();
-
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private WindowedKStream<String, String> windowedStream;
-
-    @Before
-    public void before() {
-        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
-        windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                .windowedBy(TimeWindows.of(500L));
-    }
-
-    @Test
-    public void shouldCountWindowed() {
-        final Map<Windowed<String>, Long> results = new HashMap<>();
-        windowedStream.count()
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final Long value) {
-                        results.put(key, value);
-                    }
-                });
-
-        processData();
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
-        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
-    }
-
-
-    @Test
-    public void shouldReduceWindowed() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
-        windowedStream.reduce(MockReducer.STRING_ADDER)
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value) {
-                        results.put(key, value);
-                    }
-                });
-
-        processData();
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
-        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
-    }
-
-    @Test
-    public void shouldAggregateWindowed() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER
-        )
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value) {
-                        results.put(key, value);
-                    }
-                });
-        processData();
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
-        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldMaterializeCount() {
-        windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
-                                     .withKeySerde(Serdes.String())
-                                     .withValueSerde(Serdes.Long()));
-
-        processData();
-        final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store");
-        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldMaterializeReduced() {
-        windowedStream.reduce(MockReducer.STRING_ADDER,
-                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
-                                      .withKeySerde(Serdes.String())
-                                      .withValueSerde(Serdes.String()));
-
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldMaterializeAggregated() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
-                                         .withKeySerde(Serdes.String())
-                                         .withValueSerde(Serdes.String()));
-
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
-        windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT, null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
-        windowedStream.reduce(null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
-        windowedStream.aggregate(null,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 null,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 (Materialized) null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
-        windowedStream.reduce(null,
-                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
-        windowedStream.reduce(MockReducer.STRING_ADDER,
-                              null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
-        windowedStream.count(null);
-    }
-
-    private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "1", "2");
-        driver.setTime(500);
-        driver.process(TOPIC, "1", "3");
-        driver.process(TOPIC, "2", "1");
-        driver.flushState();
-    }
-
-}
\ No newline at end of file


Mime
View raw message