kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4831) Extract WindowedSerde to public APIs
Date Fri, 09 Mar 2018 19:09:03 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393388#comment-16393388 ] 

ASF GitHub Bot commented on KAFKA-4831:
---------------------------------------

guozhangwang closed pull request #3307: KAFKA-4831: Extract WindowedSerde to public APIs
URL: https://github.com/apache/kafka/pull/3307
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index d6b4d2da611..7825ad4e9c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -26,7 +26,7 @@
  */
 public class Serdes {
 
-    static protected class WrapperSerde<T> implements Serde<T> {
+    static public class WrapperSerde<T> implements Serde<T> {
         final private Serializer<T> serializer;
         final private Deserializer<T> deserializer;
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index ea81dd66268..c5eb5f9ec7d 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -29,8 +29,7 @@
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
-import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -115,9 +114,7 @@ public boolean test(Windowed<String> key, String value) {
                 }
             });
 
-        WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
-        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
-        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
+        Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
 
         // need to override key serde to Windowed<String> type
         max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String()));
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 47becfc239b..ecfcad80e81 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -222,18 +222,32 @@
     private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
     private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
 
+    /**
+     * {@code default.windowed.key.serde.inner}
+     */
+    public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
+
+    /**
+     * {@code default.windowed.value.serde.inner}
+     */
+    public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
+
     /** {@code default key.serde} */
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
-    private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
+    private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
+            + "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
+            + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";
+
+    /** {@code default value.serde} */
+    public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
+    private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
+            + "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
+            + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";
 
     /** {@code default.timestamp.extractor} */
     public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
     private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
 
-    /** {@code default.value.serde} */
-    public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
-    private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
-
     /**
      * {@code key.serde}
      * @deprecated Use {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG} instead.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6a68f0c793f..1436e250aa1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -25,7 +25,6 @@
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index c1288f17054..55555a51592 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -24,7 +24,6 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
new file mode 100644
index 00000000000..e2e0400b110
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.internals.SessionKeySchema;
+
+import java.util.Map;
+
+/**
+ *  The inner serde class can be specified by setting the property
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS} or
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS}
+ *  if the no-arg constructor is called and hence it is not passed during initialization.
+ */
+public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
+
+    private Deserializer<T> inner;
+
+    // Default constructor needed by Kafka
+    public SessionWindowedDeserializer() {}
+
+    public SessionWindowedDeserializer(final Deserializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        if (inner == null) {
+            final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
+            final String value = (String) configs.get(propertyName);
+            try {
+                inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).deserializer();
+                inner.configure(configs, isKey);
+            } catch (final ClassNotFoundException e) {
+                throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
+            }
+        }
+    }
+
+    @Override
+    public Windowed<T> deserialize(final String topic, final byte[] data) {
+        if (data == null || data.length == 0) {
+            return null;
+        }
+
+        // for either key or value, their schema is the same hence we will just use session key schema
+        return SessionKeySchema.from(data, inner, topic);
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    // Only for testing
+    Deserializer<T> innerDeserializer() {
+        return inner;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
new file mode 100644
index 00000000000..484b3afde75
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
+import org.apache.kafka.streams.state.internals.SessionKeySchema;
+
+import java.util.Map;
+
+/**
+ *  The inner serde class can be specified by setting the property
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS} or
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS}
+ *  if the no-arg constructor is called and hence it is not passed during initialization.
+ */
+public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {
+
+    private Serializer<T> inner;
+
+    // Default constructor needed by Kafka
+    public SessionWindowedSerializer() {}
+
+    public SessionWindowedSerializer(final Serializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        if (inner == null) {
+            String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
+            String value = (String) configs.get(propertyName);
+            try {
+                inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).serializer();
+                inner.configure(configs, isKey);
+            } catch (final ClassNotFoundException e) {
+                throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
+            }
+        }
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Windowed<T> data) {
+        if (data == null) {
+            return null;
+        }
+
+        // for either key or value, their schema is the same hence we will just use session key schema
+        return SessionKeySchema.toBinary(data, inner, topic);
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public byte[] serializeBaseKey(final String topic, final Windowed<T> data) {
+        return inner.serialize(topic, data.key());
+    }
+
+    // Only for testing
+    Serializer<T> innerSerializer() {
+        return inner;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
new file mode 100644
index 00000000000..cb9c506912c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.internals.WindowKeySchema;
+
+import java.util.Map;
+
+/**
+ *  The inner serde class can be specified by setting the property
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS} or
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS}
+ *  if the no-arg constructor is called and hence it is not passed during initialization.
+ */
+public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
+
+    private final Long windowSize;
+    
+    private Deserializer<T> inner;
+    
+    // Default constructor needed by Kafka
+    public TimeWindowedDeserializer() {
+        this(null, Long.MAX_VALUE);
+    }
+
+    // TODO: fix this part as last bits of KAFKA-4468
+    public TimeWindowedDeserializer(final Deserializer<T> inner) {
+        this(inner, Long.MAX_VALUE);
+    }
+
+    public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize) {
+        this.inner = inner;
+        this.windowSize = windowSize;
+    }
+
+    public Long getWindowSize() {
+        return this.windowSize;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        if (inner == null) {
+            final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
+            final String value = (String) configs.get(propertyName);
+            try {
+                inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).deserializer();
+                inner.configure(configs, isKey);
+            } catch (final ClassNotFoundException e) {
+                throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
+            }
+        }
+    }
+
+    @Override
+    public Windowed<T> deserialize(final String topic, final byte[] data) {
+        if (data == null || data.length == 0) {
+            return null;
+        }
+
+        return WindowKeySchema.from(data, windowSize, inner, topic);
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+    
+    // Only for testing
+    Deserializer<T> innerDeserializer() {
+        return inner;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
new file mode 100644
index 00000000000..6b754191412
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
+import org.apache.kafka.streams.state.internals.WindowKeySchema;
+
+import java.util.Map;
+
+/**
+ *  The inner serde class can be specified by setting the property
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS} or
+ *  {@link StreamsConfig#DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS}
+ *  if the no-arg constructor is called and hence it is not passed during initialization.
+ */
+public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
+
+    private Serializer<T> inner;
+
+    // Default constructor needed by Kafka
+    public TimeWindowedSerializer() {}
+
+    public TimeWindowedSerializer(final Serializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        if (inner == null) {
+            final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
+            final String value = (String) configs.get(propertyName);
+            try {
+                inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).serializer();
+                inner.configure(configs, isKey);
+            } catch (final ClassNotFoundException e) {
+                throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
+            }
+        }
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Windowed<T> data) {
+        if (data == null)
+            return null;
+
+        return WindowKeySchema.toBinary(data, inner, topic);
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public byte[] serializeBaseKey(final String topic, final Windowed<T> data) {
+        return inner.serialize(topic, data.key());
+    }
+
+    // Only for testing
+    Serializer<T> innerSerializer() {
+        return inner;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
new file mode 100644
index 00000000000..d0381c787c0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+
+public class WindowedSerdes {
+
+    static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
+        // Default constructor needed for reflection object creation
+        public TimeWindowedSerde() {
+            super(new TimeWindowedSerializer<T>(), new TimeWindowedDeserializer<T>());
+        }
+
+        public TimeWindowedSerde(final Serde<T> inner) {
+            super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer()));
+        }
+    }
+
+    static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
+        // Default constructor needed for reflection object creation
+        public SessionWindowedSerde() {
+            super(new SessionWindowedSerializer<T>(), new SessionWindowedDeserializer<T>());
+        }
+
+        public SessionWindowedSerde(final Serde<T> inner) {
+            super(new SessionWindowedSerializer<>(inner.serializer()), new SessionWindowedDeserializer<>(inner.deserializer()));
+        }
+    }
+
+    /**
+     * Construct a {@code TimeWindowedSerde} object for the specified inner class type.
+     */
+    static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type) {
+        return new TimeWindowedSerde<>(Serdes.serdeFrom(type));
+    }
+
+    /**
+     * Construct a {@code SessionWindowedSerde} object for the specified inner class type.
+     */
+    static public <T> Serde<Windowed<T>> sessionWindowedSerdeFrom(final Class<T> type) {
+        return new TimeWindowedSerde<>(Serdes.serdeFrom(type));
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 141bbbb6c55..07bc67d952c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -500,8 +500,7 @@ private void to(final String topic, final ProducedInternal<K, V> produced) {
         final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
 
         if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
-            final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
+            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, (WindowedSerializer) keySerializer);
             builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, windowedPartitioner, this.name);
         } else {
             builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
deleted file mode 100644
index 2910561eead..00000000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-/**
- * Serde for a {@link Windowed} key when working with {@link org.apache.kafka.streams.kstream.SessionWindows}
- *
- * @param <K> sessionId type
- */
-public class SessionKeySerde<K> implements Serde<Windowed<K>> {
-    private static final int TIMESTAMP_SIZE = 8;
-
-    private final Serde<K> keySerde;
-
-    public SessionKeySerde(final Serde<K> keySerde) {
-        this.keySerde = keySerde;
-    }
-
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public Serializer<Windowed<K>> serializer() {
-        return new SessionKeySerializer(keySerde.serializer());
-    }
-
-    @Override
-    public Deserializer<Windowed<K>> deserializer() {
-        return new SessionKeyDeserializer(keySerde.deserializer());
-    }
-
-    private class SessionKeySerializer implements Serializer<Windowed<K>> {
-
-        private final Serializer<K> keySerializer;
-
-        SessionKeySerializer(final Serializer<K> keySerializer) {
-            this.keySerializer = keySerializer;
-        }
-
-        @Override
-        public void configure(final Map<String, ?> configs, final boolean isKey) {
-
-        }
-
-        @Override
-        public byte[] serialize(final String topic, final Windowed<K> data) {
-            if (data == null) {
-                return null;
-            }
-            return toBinary(data, keySerializer, topic).get();
-        }
-
-        @Override
-        public void close() {
-
-        }
-    }
-
-    private class SessionKeyDeserializer implements Deserializer<Windowed<K>> {
-        private final Deserializer<K> deserializer;
-
-        SessionKeyDeserializer(final Deserializer<K> deserializer) {
-            this.deserializer = deserializer;
-        }
-
-        @Override
-        public void configure(final Map<String, ?> configs, final boolean isKey) {
-        }
-
-        @Override
-        public Windowed<K> deserialize(final String topic, final byte[] data) {
-            if (data == null || data.length == 0) {
-                return null;
-            }
-            return from(data, deserializer, topic);
-        }
-
-
-        @Override
-        public void close() {
-
-        }
-    }
-
-    public static long extractEnd(final byte[] binaryKey) {
-        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
-    }
-
-    public static long extractStart(final byte[] binaryKey) {
-        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
-    }
-
-    public static Window extractWindow(final byte[] binaryKey) {
-        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
-        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
-        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
-        return new SessionWindow(start, end);
-    }
-
-    public static byte[] extractKeyBytes(final byte[] binaryKey) {
-        final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE];
-        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-        return bytes;
-    }
-
-    public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer, final String topic) {
-        final K key = extractKey(binaryKey, keyDeserializer, topic);
-        final Window window = extractWindow(binaryKey);
-        return new Windowed<>(key, window);
-    }
-
-    public static Windowed<Bytes> fromBytes(Bytes bytesKey) {
-        final byte[] binaryKey = bytesKey.get();
-        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
-        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
-        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
-        return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), new SessionWindow(start, end));
-    }
-
-    private static <K> K extractKey(final byte[] binaryKey, final Deserializer<K> deserializer, final String topic) {
-        return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
-    }
-
-    public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) {
-        final byte[] bytes = serializer.serialize(topic, sessionKey.key());
-        ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
-        buf.put(bytes);
-        buf.putLong(sessionKey.window().end());
-        buf.putLong(sessionKey.window().start());
-        return new Bytes(buf.array());
-    }
-
-    public static Bytes bytesToBinary(final Windowed<Bytes> sessionKey) {
-        final byte[] bytes = sessionKey.key().get();
-        ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
-        buf.put(bytes);
-        buf.putLong(sessionKey.window().end());
-        buf.putLong(sessionKey.window().start());
-        return new Bytes(buf.array());
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
deleted file mode 100644
index 67fee49bd37..00000000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.internals.WindowStoreUtils;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-/**
- *  The inner deserializer class can be specified by setting the property key.deserializer.inner.class,
- *  value.deserializer.inner.class or deserializer.inner.class,
- *  if the no-arg constructor is called and hence it is not passed during initialization.
- *  Note that the first two take precedence over the last.
- */
-public class WindowedDeserializer<T> implements Deserializer<Windowed<T>> {
-
-    private static final int TIMESTAMP_SIZE = 8;
-    private final Long windowSize;
-    
-    private Deserializer<T> inner;
-    
-    // Default constructor needed by Kafka
-    public WindowedDeserializer() {
-        this(null, Long.MAX_VALUE);
-    }
-    
-    public WindowedDeserializer(final Long windowSize) {
-       this(null, windowSize);
-    }
-    
-    public WindowedDeserializer(final Deserializer<T> inner) {
-        this(inner, Long.MAX_VALUE);
-    }
-
-    public WindowedDeserializer(final Deserializer<T> inner,
-                                final long windowSize) {
-        this.inner = inner;
-        this.windowSize = windowSize;
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        if (inner == null) {
-            String propertyName = isKey ? "key.deserializer.inner.class" : "value.deserializer.inner.class";
-            Object innerDeserializerClass = configs.get(propertyName);
-            propertyName = (innerDeserializerClass == null) ? "deserializer.inner.class" : propertyName;
-            String value = null;
-            try {
-                value = (String) configs.get(propertyName);
-                inner = Deserializer.class.cast(Utils.newInstance(value, Deserializer.class));
-                inner.configure(configs, isKey);
-            } catch (ClassNotFoundException e) {
-                throw new ConfigException(propertyName, value, "Class " + value + " could not be found.");
-            }
-        }
-    }
-
-    @Override
-    public Windowed<T> deserialize(String topic, byte[] data) {
-
-        byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
-
-        System.arraycopy(data, 0, bytes, 0, bytes.length);
-        
-        long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
-        
-        Window timeWindow = windowSize != Long.MAX_VALUE ? WindowStoreUtils.timeWindowForSize(start, windowSize) : new UnlimitedWindow(start);
-        return new Windowed<T>(inner.deserialize(topic, bytes), timeWindow);
-    }
-
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-    
-    // Only for testing
-    public Deserializer<T> innerDeserializer() {
-        return inner;
-    }
-    
-    public Long getWindowSize() {
-        return this.windowSize;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index b4e5d442aee..09185b2ab2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -16,73 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Windowed;
 
-import java.nio.ByteBuffer;
-import java.util.Map;
+public interface WindowedSerializer<T> extends Serializer<Windowed<T>> {
 
-/**
- *  The inner serializer class can be specified by setting the property key.serializer.inner.class,
- *  value.serializer.inner.class or serializer.inner.class,
- *  if the no-arg constructor is called and hence it is not passed during initialization.
- *  Note that the first two take precedence over the last.
- */
-public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
-
-    private static final int TIMESTAMP_SIZE = 8;
-
-    private Serializer<T> inner;
-
-    public WindowedSerializer(Serializer<T> inner) {
-        this.inner = inner;
-    }
-
-    // Default constructor needed by Kafka
-    public WindowedSerializer() {}
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        if (inner == null) {
-            String propertyName = isKey ? "key.serializer.inner.class" : "value.serializer.inner.class";
-            Object innerSerializerClass = configs.get(propertyName);
-            propertyName = (innerSerializerClass == null) ? "serializer.inner.class" : propertyName;
-            String value = null;
-            try {
-                value = (String) configs.get(propertyName);
-                inner = Serializer.class.cast(Utils.newInstance(value, Serializer.class));
-                inner.configure(configs, isKey);
-            } catch (ClassNotFoundException e) {
-                throw new ConfigException(propertyName, value, "Class " + value + " could not be found.");
-            }
-        }
-    }
-
-    @Override
-    public byte[] serialize(String topic, Windowed<T> data) {
-        byte[] serializedKey = inner.serialize(topic, data.key());
-
-        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
-        buf.put(serializedKey);
-        buf.putLong(data.window().start());
-
-        return buf.array();
-    }
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-
-    byte[] serializeBaseKey(String topic, Windowed<T> data) {
-        return inner.serialize(topic, data.key());
-    }
-
-    // Only for testing
-    Serializer<T> innerSerializer() {
-        return inner;
-    }
+    byte[] serializeBaseKey(String topic, Windowed<T> data);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index fa1ceae39ec..7b04f6a3ff6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -48,6 +48,4 @@ public Integer partition(final Windowed<K> windowedKey, final V value, final int
         // hash the keyBytes to choose a partition
         return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
-
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 31b9d75ef8d..068ac88ffba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -21,7 +21,6 @@
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -141,7 +140,7 @@ public void remove(final Windowed<Bytes> sessionKey) {
     @Override
     public void put(final Windowed<Bytes> key, byte[] value) {
         validateStoreOpen();
-        final Bytes binaryKey = SessionKeySerde.bytesToBinary(key);
+        final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key));
         final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
                                                       key.window().end(), context.partition(), context.topic());
         cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
@@ -165,7 +164,7 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern
         final RecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
-            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), serdes.keyDeserializer(), topic);
+            final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(), topic);
             final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index e3d0f629306..9ef41ce20c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -35,7 +35,6 @@
 
 class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> {
 
-
     private final WindowStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
@@ -92,11 +91,10 @@ private void initInternal(final ProcessorContext context) {
             public void apply(final List<ThreadCache.DirtyEntry> entries) {
                 for (ThreadCache.DirtyEntry entry : entries) {
                     final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get();
-                    final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey);
+                    final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey);
 
-                    final Windowed<K> windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, serdes),
-                            WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
-                    final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey);
+                    final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, serdes);
+                    final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey));
                     maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
                     underlying.put(key, entry.newValue(), timestamp);
                 }
@@ -151,7 +149,7 @@ public synchronized void put(final Bytes key, final byte[] value, final long tim
         // if store is open outside as well.
         validateStoreOpen();
         
-        final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key.get(), timestamp, 0);
+        final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
         final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
                                                       timestamp, context.partition(), context.topic());
         cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
@@ -160,7 +158,7 @@ public synchronized void put(final Bytes key, final byte[] value, final long tim
     @Override
     public byte[] fetch(final Bytes key, final long timestamp) {
         validateStoreOpen();
-        final Bytes bytesKey = WindowStoreUtils.toBinaryKey(key.get(), timestamp, 0);
+        final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
         final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
         final LRUCacheEntry entry = cache.get(name, cacheKey);
         if (entry == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 0fdd3e0e658..1fcf60e35e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -23,6 +24,7 @@
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 
@@ -38,13 +40,8 @@
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         inner.init(context, root);
-        this.changeLogger = new StoreChangeLogger<>(
-            inner.name(),
-            context,
-            WindowStoreUtils.getInnerStateSerde(
-                ProcessorStateManager.storeChangelogTopic(
-                    context.applicationId(),
-                    inner.name())));
+        final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name());
+        this.changeLogger = new StoreChangeLogger<>(inner.name(), context, new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
         if (inner instanceof MemoryLRUCache) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 12c0bc95ab5..57401aece0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -34,8 +34,6 @@
 
     private final SessionStore<Bytes, byte[]> bytesStore;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
-    private StateSerdes<Bytes, byte[]> innerStateSerde;
-    private String topic;
 
     ChangeLoggingSessionBytesStore(final SessionStore<Bytes, byte[]> bytesStore) {
         super(bytesStore);
@@ -45,15 +43,13 @@
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         bytesStore.init(context, root);
-        topic = ProcessorStateManager.storeChangelogTopic(
+        final String topic = ProcessorStateManager.storeChangelogTopic(
                 context.applicationId(),
                 bytesStore.name());
-        innerStateSerde = WindowStoreUtils.getInnerStateSerde(
-                topic);
         changeLogger = new StoreChangeLogger<>(
-            name(),
-            context,
-            innerStateSerde);
+                name(),
+                context,
+                new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
     }
 
 
@@ -70,13 +66,13 @@ public void init(final ProcessorContext context, final StateStore root) {
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
         bytesStore.remove(sessionKey);
-        changeLogger.logChange(SessionKeySerde.toBinary(sessionKey, innerStateSerde.keySerializer(), topic), null);
+        changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), null);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
         bytesStore.put(sessionKey, aggregate);
-        changeLogger.logChange(SessionKeySerde.bytesToBinary(sessionKey), aggregate);
+        changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), aggregate);
 
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index e69a320e510..89d7260fb4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -78,19 +79,18 @@ public void put(final Bytes key, final byte[] value) {
     @Override
     public void put(final Bytes key, final byte[] value, final long timestamp) {
         bytesStore.put(key, value, timestamp);
-        changeLogger.logChange(WindowStoreUtils.toBinaryKey(key.get(), timestamp, maybeUpdateSeqnumForDups()), value);
+        changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, maybeUpdateSeqnumForDups()), value);
     }
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
         bytesStore.init(context, root);
-
-        final StateSerdes<Bytes, byte[]> bytesSerde = WindowStoreUtils.getInnerStateSerde(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()));
+        final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name());
         changeLogger = new StoreChangeLogger<>(
             name(),
             context,
-            bytesSerde);
+            new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
     }
 
     private int maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
index ce894bc41cc..01b577cf95f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -20,7 +20,6 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 /**
@@ -46,8 +45,8 @@
     @Override
     Windowed<Bytes> deserializeCacheKey(final Bytes cacheKey) {
         final byte[] binaryKey = cacheFunction.key(cacheKey).get();
-        final byte[] keyBytes = SessionKeySerde.extractKeyBytes(binaryKey);
-        final Window window = SessionKeySerde.extractWindow(binaryKey);
+        final byte[] keyBytes = SessionKeySchema.extractKeyBytes(binaryKey);
+        final Window window = SessionKeySchema.extractWindow(binaryKey);
         return new Windowed<>(Bytes.wrap(keyBytes), window);
     }
 
@@ -64,7 +63,7 @@
 
     @Override
     public int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) {
-        Bytes storeKeyBytes = SessionKeySerde.bytesToBinary(storeKey);
+        final Bytes storeKeyBytes = Bytes.wrap(SessionKeySchema.toBinary(storeKey));
         return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
index 6eadded899b..b08cf851e1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
@@ -43,7 +43,7 @@
     @Override
     Long deserializeCacheKey(final Bytes cacheKey) {
         byte[] binaryKey = bytesFromCacheKey(cacheKey);
-        return WindowStoreUtils.timestampFromBinaryKey(binaryKey);
+        return WindowKeySchema.extractStoreTimestamp(binaryKey);
     }
 
     @Override
@@ -60,7 +60,7 @@ public Long deserializeStoreKey(final Long key) {
     public int compare(final Bytes cacheKey, final Long storeKey) {
         byte[] binaryKey = bytesFromCacheKey(cacheKey);
 
-        final Long cacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
+        final Long cacheTimestamp = WindowKeySchema.extractStoreTimestamp(binaryKey);
         return cacheTimestamp.compareTo(storeKey);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
index 7b1fe17020e..ef0a44e3128 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
@@ -55,11 +55,8 @@
 
     @Override
     Windowed<Bytes> deserializeCacheKey(final Bytes cacheKey) {
-        byte[] binaryKey = cacheFunction.key(cacheKey).get();
-
-        final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
-        final Bytes key = WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes);
-        return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
+        final byte[] binaryKey = cacheFunction.key(cacheKey).get();
+        return WindowKeySchema.fromStoreKey(binaryKey, windowSize, serdes);
     }
 
     @Override
@@ -69,7 +66,7 @@
 
     @Override
     int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) {
-        Bytes storeKeyBytes = WindowStoreUtils.toBinaryKey(storeKey.key().get(), storeKey.window().start(), 0);
+        final Bytes storeKeyBytes = WindowKeySchema.toStoreKeyBinary(storeKey.key(), storeKey.window().start(), 0);
         return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
index ace24872533..c0d1c3b02b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
@@ -57,7 +57,7 @@ static Bytes lowerRange(Bytes key, byte[] minSuffix) {
 
         // unless there is a maximum key length, you can keep appending more zero bytes
         // to keyFrom to create a key that will match the range, yet that would precede
-        // WindowStoreUtils.toBinaryKey(keyFrom, from, 0) in byte order
+        // KeySchema.toBinaryKey(keyFrom, from, 0) in byte order
         return Bytes.wrap(
             rangeStart
                 .put(bytes)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index c9267dc8fda..f902cba35fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -32,10 +31,10 @@
 
     private final Serde<K> keySerde;
     private final Serde<AGG> aggSerde;
-    protected final SegmentedBytesStore bytesStore;
+    private final SegmentedBytesStore bytesStore;
 
-    protected StateSerdes<K, AGG> serdes;
-    protected String topic;
+    private StateSerdes<K, AGG> serdes;
+    private String topic;
 
     RocksDBSessionStore(final SegmentedBytesStore bytesStore,
                         final Serde<K> keySerde,
@@ -86,11 +85,11 @@ public void init(final ProcessorContext context, final StateStore root) {
 
     @Override
     public void remove(final Windowed<K> key) {
-        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic));
+        bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic)));
     }
 
     @Override
     public void put(final Windowed<K> sessionKey, final AGG aggregate) {
-        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), serdes.rawValue(aggregate));
+        bytesStore.put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, serdes.keySerializer(), topic)), serdes.rawValue(aggregate));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 732f3d62a42..a9af36d3679 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -30,45 +29,15 @@
 
 public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
 
-    // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
-    private static class RocksDBWindowBytesStore extends RocksDBWindowStore<Bytes, byte[]> {
-        RocksDBWindowBytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates, final long windowSize) {
-            super(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates, windowSize);
-        }
-
-        @Override
-        public void put(Bytes key, byte[] value, long timestamp) {
-            maybeUpdateSeqnumForDups();
-
-            bytesStore.put(WindowStoreUtils.toBinaryKey(key.get(), timestamp, seqnum), value);
-        }
-
-        @Override
-        public WindowStoreIterator<byte[]> fetch(Bytes key, long timeFrom, long timeTo) {
-            final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(key, timeFrom, timeTo);
-            return WindowStoreIteratorWrapper.bytesIterator(bytesIterator, serdes, windowSize).valuesIterator();
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to, long timeFrom, long timeTo) {
-            final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(from, to, timeFrom, timeTo);
-            return WindowStoreIteratorWrapper.bytesIterator(bytesIterator, serdes, windowSize).keyValueIterator();
-        }
-    }
-
-    static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates, final long windowSize) {
-        return new RocksDBWindowBytesStore(inner, retainDuplicates, windowSize);
-    }
-
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final boolean retainDuplicates;
-    protected final long windowSize;
-    protected final SegmentedBytesStore bytesStore;
+    private final long windowSize;
+    private final SegmentedBytesStore bytesStore;
 
     private ProcessorContext context;
-    protected StateSerdes<K, V> serdes;
-    protected int seqnum = 0;
+    private StateSerdes<K, V> serdes;
+    private int seqnum = 0;
 
     RocksDBWindowStore(final SegmentedBytesStore bytesStore,
                        final Serde<K> keySerde,
@@ -104,12 +73,12 @@ public void put(final K key, final V value) {
     public void put(final K key, final V value, final long timestamp) {
         maybeUpdateSeqnumForDups();
 
-        bytesStore.put(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes), serdes.rawValue(value));
+        bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes), serdes.rawValue(value));
     }
 
     @Override
     public V fetch(final K key, final long timestamp) {
-        final byte[] bytesValue = bytesStore.get(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes));
+        final byte[] bytesValue = bytesStore.get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes));
         if (bytesValue == null) {
             return null;
         }
@@ -127,7 +96,7 @@ public V fetch(final K key, final long timestamp) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
-    
+
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
@@ -140,7 +109,7 @@ public V fetch(final K key, final long timestamp) {
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
 
-    void maybeUpdateSeqnumForDups() {
+    private void maybeUpdateSeqnumForDups() {
         if (retainDuplicates) {
             seqnum = (seqnum + 1) & 0x7FFFFFFF;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index e873435f1ca..e1521f8438c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
@@ -56,9 +57,11 @@ public String name() {
                 segments,
                 new WindowKeySchema()
         );
-        return RocksDBWindowStore.bytesStore(segmentedBytesStore,
-                                             retainDuplicates,
-                                             windowSize);
+        return new RocksDBWindowStore<>(segmentedBytesStore,
+                Serdes.Bytes(),
+                Serdes.ByteArray(),
+                retainDuplicates,
+                windowSize);
 
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index e3dd5530999..181d409f9b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
@@ -27,12 +30,14 @@
 import java.util.List;
 
 
-class SessionKeySchema implements SegmentedBytesStore.KeySchema {
+public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
 
-    private static final int SUFFIX_SIZE = 2 * WindowStoreUtils.TIMESTAMP_SIZE;
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SUFFIX_SIZE = 2 * TIMESTAMP_SIZE;
     private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
 
     private String topic;
+    private final Serde<Bytes> bytesSerdes = Serdes.Bytes();
 
     @Override
     public void init(final String topic) {
@@ -42,13 +47,13 @@ public void init(final String topic) {
     @Override
     public Bytes upperRangeFixedSize(final Bytes key, final long to) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
-        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
+        return Bytes.wrap(SessionKeySchema.toBinary(sessionKey, bytesSerdes.serializer(), topic));
     }
 
     @Override
     public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
-        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
+        return Bytes.wrap(SessionKeySchema.toBinary(sessionKey, bytesSerdes.serializer(), topic));
     }
 
     @Override
@@ -68,7 +73,7 @@ public Bytes lowerRange(Bytes key, long from) {
 
     @Override
     public long segmentTimestamp(final Bytes key) {
-        return SessionKeySerde.extractEnd(key.get());
+        return SessionKeySchema.extractEndTimestamp(key.get());
     }
 
     @Override
@@ -78,7 +83,7 @@ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes
             public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
                 while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
-                    final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
+                    final Windowed<Bytes> windowedKey = SessionKeySchema.from(bytes);
                     if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0)
                         && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0)
                         && windowedKey.window().end() >= from
@@ -93,7 +98,70 @@ public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
     }
 
     @Override
-    public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
+    public List<Segment> segmentsToSearch(final Segments segments,
+                                          final long from,
+                                          final long to) {
         return segments.segments(from, Long.MAX_VALUE);
     }
+
+    private static <K> K extractKey(final byte[] binaryKey,
+                                    final Deserializer<K> deserializer,
+                                    final String topic) {
+        return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
+    }
+
+    public static byte[] extractKeyBytes(final byte[] binaryKey) {
+        final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE];
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+        return bytes;
+    }
+
+    public static long extractEndTimestamp(final byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
+    }
+
+    public static long extractStartTimestamp(final byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
+    }
+
+    public static Window extractWindow(final byte[] binaryKey) {
+        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
+        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
+        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
+        return new SessionWindow(start, end);
+    }
+
+    public static <K> Windowed<K> from(final byte[] binaryKey,
+                                       final Deserializer<K> keyDeserializer,
+                                       final String topic) {
+        final K key = extractKey(binaryKey, keyDeserializer, topic);
+        final Window window = extractWindow(binaryKey);
+        return new Windowed<>(key, window);
+    }
+
+    public static Windowed<Bytes> from(final Bytes bytesKey) {
+        final byte[] binaryKey = bytesKey.get();
+        final Window window = extractWindow(binaryKey);
+        return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), window);
+    }
+
+    public static <K> byte[] toBinary(final Windowed<K> sessionKey,
+                                      final Serializer<K> serializer,
+                                      final String topic) {
+        final byte[] bytes = serializer.serialize(topic, sessionKey.key());
+        final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
+        buf.put(bytes);
+        buf.putLong(sessionKey.window().end());
+        buf.putLong(sessionKey.window().start());
+        return buf.array();
+    }
+
+    public static byte[] toBinary(final Windowed<Bytes> sessionKey) {
+        final byte[] bytes = sessionKey.key().get();
+        final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
+        buf.put(bytes);
+        buf.putLong(sessionKey.window().end());
+        buf.putLong(sessionKey.window().start());
+        return buf.array();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 1e217fa4370..1055df56534 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -39,7 +39,6 @@
     private final ProcessorContext context;
     private final RecordCollector collector;
 
-
     StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
         this(storeName, context, context.taskId().partition, serialization);
     }
@@ -59,5 +58,4 @@ void logChange(final K key, final V value) {
             collector.send(this.topic, key, value, this.partition, context.timestamp(), keySerializer, valueSerializer);
         }
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index e432baad691..3c59cd6863f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -16,15 +16,23 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 
-class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
 
-    private static final int SUFFIX_SIZE = WindowStoreUtils.TIMESTAMP_SIZE + WindowStoreUtils.SEQNUM_SIZE;
+    private static final int SEQNUM_SIZE = 4;
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
     private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
 
     @Override
@@ -49,17 +57,17 @@ public Bytes lowerRange(final Bytes key, final long from) {
 
     @Override
     public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
-        return WindowStoreUtils.toBinaryKey(key.get(), Math.max(0, from), 0);
+        return WindowKeySchema.toStoreKeyBinary(key, Math.max(0, from), 0);
     }
 
     @Override
     public Bytes upperRangeFixedSize(final Bytes key, final long to) {
-        return WindowStoreUtils.toBinaryKey(key.get(), to, Integer.MAX_VALUE);
+        return WindowKeySchema.toStoreKeyBinary(key, to, Integer.MAX_VALUE);
     }
 
     @Override
     public long segmentTimestamp(final Bytes key) {
-        return WindowStoreUtils.timestampFromBinaryKey(key.get());
+        return WindowKeySchema.extractStoreTimestamp(key.get());
     }
 
     @Override
@@ -69,8 +77,8 @@ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes
             public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
                 while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
-                    final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
-                    final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
+                    final Bytes keyBytes = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(bytes.get()));
+                    final long time = WindowKeySchema.extractStoreTimestamp(bytes.get());
                     if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
                         && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
                         && time >= from
@@ -89,4 +97,129 @@ public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
         return segments.segments(from, to);
     }
 
+    /**
+     * Safely construct a time window of the given size,
+     * taking care of bounding endMs to Long.MAX_VALUE if necessary
+     */
+    public static TimeWindow timeWindowForSize(final long startMs,
+                                               final long windowSize) {
+        final long endMs = startMs + windowSize;
+        return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
+    }
+
+    // for pipe serdes
+
+    public static <K> byte[] toBinary(final Windowed<K> timeKey,
+                                      final Serializer<K> serializer,
+                                      final String topic) {
+        final byte[] bytes = serializer.serialize(topic, timeKey.key());
+        final ByteBuffer buf = ByteBuffer.allocate(bytes.length + TIMESTAMP_SIZE);
+        buf.put(bytes);
+        buf.putLong(timeKey.window().start());
+
+        return buf.array();
+    }
+
+    public static <K> Windowed<K> from(final byte[] binaryKey,
+                                       final long windowSize,
+                                       final Deserializer<K> deserializer,
+                                       final String topic) {
+        final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+        final K key = deserializer.deserialize(topic, bytes);
+        final Window window = extractWindow(binaryKey, windowSize);
+        return new Windowed<>(key, window);
+    }
+
+    private static Window extractWindow(final byte[] binaryKey,
+                                        final long windowSize) {
+        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
+        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
+        return timeWindowForSize(start, windowSize);
+    }
+
+    // for store serdes
+
+    public static Bytes toStoreKeyBinary(final Bytes key,
+                                         final long timestamp,
+                                         final int seqnum) {
+        final byte[] serializedKey = key.get();
+        return toStoreKeyBinary(serializedKey, timestamp, seqnum);
+    }
+
+    public static <K> Bytes toStoreKeyBinary(final K key,
+                                             final long timestamp,
+                                             final int seqnum,
+                                             final StateSerdes<K, ?> serdes) {
+        final byte[] serializedKey = serdes.rawKey(key);
+        return toStoreKeyBinary(serializedKey, timestamp, seqnum);
+    }
+
+    public static Bytes toStoreKeyBinary(final Windowed<Bytes> timeKey,
+                                         final int seqnum) {
+        final byte[] bytes = timeKey.key().get();
+        return toStoreKeyBinary(bytes, timeKey.window().start(), seqnum);
+    }
+
+    public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
+                                             final int seqnum,
+                                             final StateSerdes<K, ?> serdes) {
+        final byte[] serializedKey = serdes.rawKey(timeKey.key());
+        return toStoreKeyBinary(serializedKey, timeKey.window().start(), seqnum);
+    }
+
+    // package private for testing
+    static Bytes toStoreKeyBinary(final byte[] serializedKey,
+                                  final long timestamp,
+                                  final int seqnum) {
+        final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
+        buf.put(serializedKey);
+        buf.putLong(timestamp);
+        buf.putInt(seqnum);
+
+        return Bytes.wrap(buf.array());
+    }
+
+    public static byte[] extractStoreKeyBytes(final byte[] binaryKey) {
+        final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+        return bytes;
+    }
+
+    public static <K> K extractStoreKey(final byte[] binaryKey,
+                                        final StateSerdes<K, ?> serdes) {
+        final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+        return serdes.keyFrom(bytes);
+    }
+
+    public static long extractStoreTimestamp(final byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
+    }
+
+    public static int extractStoreSequence(final byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
+    }
+
+    public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
+                                               final long windowSize,
+                                               final StateSerdes<K, ?> serdes) {
+        final K key = serdes.keyDeserializer().deserialize(serdes.topic(), extractStoreKeyBytes(binaryKey));
+        final Window window = extractStoreWindow(binaryKey, windowSize);
+        return new Windowed<>(key, window);
+    }
+
+    public static Windowed<Bytes> fromStoreKey(final byte[] binaryKey,
+                                               final long windowSize) {
+        final Bytes key = Bytes.wrap(extractStoreKeyBytes(binaryKey));
+        final Window window = extractStoreWindow(binaryKey, windowSize);
+        return new Windowed<>(key, window);
+    }
+
+    public static Window extractStoreWindow(final byte[] binaryKey,
+                                            final long windowSize) {
+        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
+        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
+        return timeWindowForSize(start, windowSize);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
index 4cb85d6d760..e83e6e61ae7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
@@ -23,75 +23,15 @@
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.util.NoSuchElementException;
-
 class WindowStoreIteratorWrapper<K, V> {
 
-    // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
-    private static class WrappedWindowStoreBytesIterator extends WindowStoreIteratorWrapper<Bytes, byte[]> {
-        WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
-                                        final StateSerdes<Bytes, byte[]> serdes,
-                                        final long windowSize) {
-            super(underlying, serdes, windowSize);
-        }
-
-        @Override
-        public WindowStoreIterator<byte[]> valuesIterator() {
-            return new WrappedWindowStoreIterator<byte[]>(bytesIterator, serdes) {
-                @Override
-                public KeyValue<Long, byte[]> next() {
-                    final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-                    final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
-                    return KeyValue.pair(timestamp, next.value);
-                }
-            };
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator() {
-            return new WrappedKeyValueIterator<Bytes, byte[]>(bytesIterator, serdes, windowSize) {
-                @Override
-                public Windowed<Bytes> peekNextKey() {
-                    final Bytes next = bytesIterator.peekNextKey();
-                    final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get());
-                    final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get());
-                    return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
-                }
-
-                @Override
-                public KeyValue<Windowed<Bytes>, byte[]> next() {
-                    if (!bytesIterator.hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-
-                    final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-                    final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
-                    final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.key.get());
-                    return KeyValue.pair(
-                        new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)),
-                        next.value
-                    );
-                }
-            };
-        }
-    }
-
-    static WindowStoreIteratorWrapper<Bytes, byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
-                                                                   final StateSerdes<Bytes, byte[]> serdes,
-                                                                   final long windowSize) {
-        return new WrappedWindowStoreBytesIterator(underlying, serdes, windowSize);
-    }
-
-
-    protected final KeyValueIterator<Bytes, byte[]> bytesIterator;
-    protected final StateSerdes<K, V> serdes;
-    protected final long windowSize;
+    private final KeyValueIterator<Bytes, byte[]> bytesIterator;
+    private final StateSerdes<K, V> serdes;
+    private final long windowSize;
 
-    WindowStoreIteratorWrapper(
-        final KeyValueIterator<Bytes, byte[]> bytesIterator,
-        final StateSerdes<K, V> serdes,
-        final long windowSize
-    ) {
+    WindowStoreIteratorWrapper(final KeyValueIterator<Bytes, byte[]> bytesIterator,
+                               final StateSerdes<K, V> serdes,
+                               final long windowSize) {
         this.bytesIterator = bytesIterator;
         this.serdes = serdes;
         this.windowSize = windowSize;
@@ -117,7 +57,7 @@
 
         @Override
         public Long peekNextKey() {
-            return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get());
+            return WindowKeySchema.extractStoreTimestamp(bytesIterator.peekNextKey().get());
         }
 
         @Override
@@ -128,7 +68,7 @@ public boolean hasNext() {
         @Override
         public KeyValue<Long, V> next() {
             final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+            final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get());
             final V value = serdes.valueFrom(next.value);
             return KeyValue.pair(timestamp, value);
         }
@@ -160,9 +100,9 @@ public void close() {
         @Override
         public Windowed<K> peekNextKey() {
             final byte[] nextKey = bytesIterator.peekNextKey().get();
-            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(nextKey);
-            final K key = WindowStoreUtils.keyFromBinaryKey(nextKey, serdes);
-            return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
+            final long timestamp = WindowKeySchema.extractStoreTimestamp(nextKey);
+            final K key = WindowKeySchema.extractStoreKey(nextKey, serdes);
+            return new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize));
         }
 
         @Override
@@ -173,11 +113,11 @@ public boolean hasNext() {
         @Override
         public KeyValue<Windowed<K>, V> next() {
             final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
-            final K key = WindowStoreUtils.keyFromBinaryKey(next.key.get(), serdes);
+            final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get());
+            final K key = WindowKeySchema.extractStoreKey(next.key.get(), serdes);
             final V value = serdes.valueFrom(next.value);
             return KeyValue.pair(
-                new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)),
+                new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)),
                 value
             );
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
deleted file mode 100644
index 317ce227105..00000000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.nio.ByteBuffer;
-
-public class WindowStoreUtils {
-
-    static final int SEQNUM_SIZE = 4;
-    static final int TIMESTAMP_SIZE = 8;
-
-    /** Inner byte array serde used for segments */
-    static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
-    static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
-
-    static StateSerdes<Bytes, byte[]> getInnerStateSerde(final String topic) {
-        return new StateSerdes<>(topic, INNER_KEY_SERDE, INNER_VALUE_SERDE);
-    }
-
-    static <K> Bytes toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
-        byte[] serializedKey = serdes.rawKey(key);
-        return toBinaryKey(serializedKey, timestamp, seqnum);
-    }
-
-    static Bytes toBinaryKey(byte[] serializedKey, final long timestamp, final int seqnum) {
-        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
-        buf.put(serializedKey);
-        buf.putLong(timestamp);
-        buf.putInt(seqnum);
-
-        return Bytes.wrap(buf.array());
-    }
-
-    static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?> serdes) {
-        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
-
-        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-
-        return serdes.keyFrom(bytes);
-    }
-
-    static Bytes bytesKeyFromBinaryKey(byte[] binaryKey) {
-        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
-
-        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-
-        return Bytes.wrap(bytes);
-    }
-
-    static long timestampFromBinaryKey(byte[] binaryKey) {
-        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
-    }
-
-    static int sequenceNumberFromBinaryKey(byte[] binaryKey) {
-        return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
-    }
-
-    /**
-     * Safely construct a time window of the given size,
-     * taking care of bounding endMs to Long.MAX_VALUE if necessary
-     */
-    public static TimeWindow timeWindowForSize(final long startMs, final long windowSize) {
-        final long endMs = startMs + windowSize;
-        return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
index c5ea70bac80..47496a4b766 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 
@@ -37,14 +36,13 @@
         @Override
         public Windowed<Bytes> peekNextKey() {
             final Bytes key = bytesIterator.peekNextKey();
-
-            return SessionKeySerde.fromBytes(key);
+            return SessionKeySchema.from(key);
         }
 
         @Override
         public KeyValue<Windowed<Bytes>, byte[]> next() {
             final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-            return KeyValue.pair(SessionKeySerde.fromBytes(next.key), next.value);
+            return KeyValue.pair(SessionKeySchema.from(next.key), next.value);
         }
     }
 
@@ -66,7 +64,7 @@ public void close() {
     @Override
     public Windowed<K> peekNextKey() {
         final Bytes bytes = bytesIterator.peekNextKey();
-        return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer(), serdes.topic());
+        return SessionKeySchema.from(bytes.get(), serdes.keyDeserializer(), serdes.topic());
     }
 
     @Override
@@ -77,8 +75,7 @@ public boolean hasNext() {
     @Override
     public KeyValue<Windowed<K>, V> next() {
         final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-        return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()),
-                             serdes.valueFrom(next.value));
+        return KeyValue.pair(SessionKeySchema.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value));
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
new file mode 100644
index 00000000000..ec3cc833953
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SessionWindowedDeserializerTest {
+    private final SessionWindowedDeserializer<?> sessionWindowedDeserializer = new SessionWindowedDeserializer<>();
+    private final Map<String, String> props = new HashMap<>();
+
+    @Before
+    public void setUp() {
+        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName());
+        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, Serdes.ByteArraySerde.class.getName());
+    }
+
+    @Test
+    public void testWindowedKeyDeserializerNoArgConstructors() {
+        sessionWindowedDeserializer.configure(props, true);
+        Deserializer<?> inner = sessionWindowedDeserializer.innerDeserializer();
+        assertNotNull("Inner deserializer should be not null", inner);
+        assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
+    }
+
+    @Test
+    public void testWindowedValueDeserializerNoArgConstructors() {
+        sessionWindowedDeserializer.configure(props, false);
+        Deserializer<?> inner = sessionWindowedDeserializer.innerDeserializer();
+        assertNotNull("Inner deserializer should be not null", inner);
+        assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner instanceof ByteArrayDeserializer);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
new file mode 100644
index 00000000000..e7266dba9c9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SessionWindowedSerializerTest {
+    private final SessionWindowedSerializer<?> sessionWindowedSerializer = new SessionWindowedSerializer<>();
+    private final Map<String, String> props = new HashMap<>();
+
+    @Before
+    public void setUp() {
+        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName());
+        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, Serdes.ByteArraySerde.class.getName());
+    }
+
+    @Test
+    public void testWindowedKeySerializerNoArgConstructors() {
+        sessionWindowedSerializer.configure(props, true);
+        Serializer<?> inner = sessionWindowedSerializer.innerSerializer();
+        assertNotNull("Inner serializer should be not null", inner);
+        assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer);
+    }
+
+    @Test
+    public void testWindowedValueSerializerNoArgConstructors() {
+        sessionWindowedSerializer.configure(props, false);
+        Serializer<?> inner = sessionWindowedSerializer.innerSerializer();
+        assertNotNull("Inner serializer should be not null", inner);
+        assertTrue("Inner serializer type should be ByteArraySerializer", inner instanceof ByteArraySerializer);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
new file mode 100644
index 00000000000..660a530fed6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TimeWindowedDeserializerTest {
+    private final long windowSize = 5000000;
+    private final TimeWindowedDeserializer<?> timeWindowedDeserializer = new TimeWindowedDeserializer<>(null, windowSize);
+    private final Map<String, String> props = new HashMap<>();
+
+    @Before
+    public void setUp() {
+        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName());
+        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, Serdes.ByteArraySerde.class.getName());
+    }
+
+    @Test
+    public void testWindowedKeyDeserializerNoArgConstructors() {
+        timeWindowedDeserializer.configure(props, true);
+        Deserializer<?> inner = timeWindowedDeserializer.innerDeserializer();
+        assertNotNull("Inner deserializer should be not null", inner);
+        assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
+    }
+
+    @Test
+    public void testWindowedValueDeserializerNoArgConstructors() {
+        timeWindowedDeserializer.configure(props, false);
+        Deserializer<?> inner = timeWindowedDeserializer.innerDeserializer();
+        assertNotNull("Inner deserializer should be not null", inner);
+        assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner instanceof ByteArrayDeserializer);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
new file mode 100644
index 00000000000..cd019c80cd1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TimeWindowedSerializerTest {
+    private final TimeWindowedSerializer<?> timeWindowedSerializer = new TimeWindowedSerializer<>();
+    private final Map<String, String> props = new HashMap<>();
+
+    @Before
+    public void setUp() {
+        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName());
+        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, Serdes.ByteArraySerde.class.getName());
+    }
+
+    @Test
+    public void testWindowedKeySerializerNoArgConstructors() {
+        timeWindowedSerializer.configure(props, true);
+        Serializer<?> inner = timeWindowedSerializer.innerSerializer();
+        assertNotNull("Inner serializer should be not null", inner);
+        assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer);
+    }
+
+    @Test
+    public void testWindowedValueSerializerNoArgConstructors() {
+        timeWindowedSerializer.configure(props, false);
+        Serializer<?> inner = timeWindowedSerializer.innerSerializer();
+        assertNotNull("Inner serializer should be not null", inner);
+        assertTrue("Inner serializer type should be ByteArraySerializer", inner instanceof ByteArraySerializer);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
deleted file mode 100644
index 59371d5325b..00000000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class SessionKeySerdeTest {
-
-    final private String topic = "topic";
-    final private String key = "key";
-    final private long startTime = 50L;
-    final private long endTime = 100L;
-    final private Window window = new SessionWindow(startTime, endTime);
-    final private Windowed<String> windowedKey = new Windowed<>(key, window);
-    final private Serde<String> serde = Serdes.String();
-    final private SessionKeySerde<String> sessionKeySerde = new SessionKeySerde<>(serde);
-
-    @Test
-    public void shouldSerializeDeserialize() {
-        final byte[] bytes = sessionKeySerde.serializer().serialize(topic, windowedKey);
-        final Windowed<String> result = sessionKeySerde.deserializer().deserialize(topic, bytes);
-        assertEquals(windowedKey, result);
-    }
-
-    @Test
-    public void shouldSerializeNullToNull() {
-        assertNull(sessionKeySerde.serializer().serialize(topic, null));
-    }
-
-    @Test
-    public void shouldDeSerializeEmtpyByteArrayToNull() {
-        assertNull(sessionKeySerde.deserializer().deserialize(topic, new byte[0]));
-    }
-
-    @Test
-    public void shouldDeSerializeNullToNull() {
-        assertNull(sessionKeySerde.deserializer().deserialize(topic, null));
-    }
-
-    @Test
-    public void shouldConvertToBinaryAndBack() {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
-        final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "dummy");
-        assertEquals(windowedKey, result);
-    }
-
-    @Test
-    public void shouldExtractEndTimeFromBinary() {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
-        assertEquals(endTime, SessionKeySerde.extractEnd(serialized.get()));
-    }
-
-    @Test
-    public void shouldExtractStartTimeFromBinary() {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
-        assertEquals(startTime, SessionKeySerde.extractStart(serialized.get()));
-    }
-
-    @Test
-    public void shouldExtractWindowFromBindary() {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
-        assertEquals(window, SessionKeySerde.extractWindow(serialized.get()));
-    }
-
-    @Test
-    public void shouldExtractKeyBytesFromBinary() {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
-        assertArrayEquals(key.getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
-    }
-
-    @Test
-    public void shouldExtractKeyFromBinary() {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
-        assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer(), "dummy"));
-    }
-
-    @Test
-    public void shouldExtractBytesKeyFromBinary() {
-        final Bytes bytesKey = Bytes.wrap(key.getBytes());
-        final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, window);
-        final Bytes serialized = SessionKeySerde.bytesToBinary(windowedBytesKey);
-        assertEquals(windowedBytesKey, SessionKeySerde.fromBytes(serialized));
-    }
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index d3510a0897b..3aafa33e2e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -20,14 +20,9 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.junit.Test;
 
@@ -35,12 +30,8 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import java.util.Map;
-import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class WindowedStreamPartitionerTest {
 
@@ -63,13 +54,10 @@
 
     @Test
     public void testCopartitioning() {
-
-        Random rand = new Random();
-
-        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
-
-        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
-        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer);
+        final Random rand = new Random();
+        final DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+        final WindowedSerializer<Integer> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer);
+        final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, timeWindowedSerializer);
 
         for (int k = 0; k < 10; k++) {
             Integer key = rand.nextInt();
@@ -92,70 +80,4 @@ public void testCopartitioning() {
 
         defaultPartitioner.close();
     }
-    
-    @Test
-    public void testWindowedSerializerNoArgConstructors() {
-        Map<String, String> props = new HashMap<>();
-        // test key[value].serializer.inner.class takes precedence over serializer.inner.class
-        WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>();
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
-        windowedSerializer.configure(props, true);
-        Serializer<?> inner = windowedSerializer.innerSerializer();
-        assertNotNull("Inner serializer should be not null", inner);
-        assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer);
-        // test serializer.inner.class
-        props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.remove("key.serializer.inner.class");
-        props.remove("value.serializer.inner.class");
-        WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>();
-        windowedSerializer1.configure(props, false);
-        Serializer<?> inner1 = windowedSerializer1.innerSerializer();
-        assertNotNull("Inner serializer should be not null", inner1);
-        assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer);
-        windowedSerializer.close();
-        windowedSerializer1.close();
-    }
-    
-    @Test
-    public void testWindowedDeserializerNoArgConstructors() {
-        Map<String, String> props = new HashMap<>();
-        WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>();
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
-        windowedDeserializer.configure(props, true);
-        Deserializer<?> inner = windowedDeserializer.innerDeserializer();
-        assertNotNull("Inner deserializer should be not null", inner);
-        assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
-        // test deserializer.inner.class
-        props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        props.remove("key.deserializer.inner.class");
-        props.remove("value.deserializer.inner.class");
-        WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>();
-        windowedDeserializer1.configure(props, false);
-        final Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
-        assertNotNull("Inner deserializer should be not null", inner1);
-        assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer);
-        windowedDeserializer.close();
-        windowedDeserializer1.close();
-    }
-    
-    @Test
-    public void testWindowDeserializeExpectedWindowSize() {
-        final long randomLong = 5000000;
-        final Map<String, String> props = new HashMap<>();
-        final WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>(randomLong);
-        props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
-        windowedDeserializer.configure(props, true);
-        //test for deserializer expected window end time
-        final byte[] byteValues = stringSerializer.serialize(topicName, "dummy string"); //dummy string, serves no real purpose
-        final Windowed<?> windowed = windowedDeserializer.deserialize(topicName, byteValues);
-        final long actualSize = windowed.window().end() - windowed.window().start(); //find actual window time
-        assertEquals(randomLong, actualSize); //testing if window size matches up with expected one
-        windowedDeserializer.close();
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 5f934b83454..bbf9bef03f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -259,7 +259,7 @@ public void shouldIterateAcrossWindows() {
     @Test
     public void shouldIterateCacheAndStore() {
         final Bytes key = Bytes.wrap("1" .getBytes());
-        underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes());
+        underlying.put(WindowKeySchema.toStoreKeyBinary(key, DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE);
         final WindowStoreIterator<byte[]> fetch = cachingStore.fetch(bytesKey("1"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
         verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
@@ -270,7 +270,7 @@ public void shouldIterateCacheAndStore() {
     @Test
     public void shouldIterateCacheAndStoreKeyRange() {
         final Bytes key = Bytes.wrap("1" .getBytes());
-        underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes());
+        underlying.put(WindowKeySchema.toStoreKeyBinary(key, DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE);
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
@@ -366,7 +366,7 @@ public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
         return KeyValue.pair(new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), bytesValue(value));
     }
 
-    private int addItemsToCache() throws IOException {
+    private int addItemsToCache() {
         int cachedSize = 0;
         int i = 0;
         while (cachedSize < MAX_CACHE_SIZE_BYTES) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 98224e9c611..a658186931f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -44,7 +43,7 @@
 public class ChangeLoggingSessionBytesStoreTest {
 
     private final TaskId taskId = new TaskId(0, 0);
-    private final Map sent = new HashMap<>();
+    private final Map<Object, Object> sent = new HashMap<>();
     private final NoOpRecordCollector collector = new NoOpRecordCollector() {
         @Override
         public <K, V> void send(final String topic,
@@ -84,7 +83,7 @@ private void init() {
     }
 
     @Test
-    public void shouldLogPuts() throws Exception {
+    public void shouldLogPuts() {
         inner.put(key1, value1);
         EasyMock.expectLastCall();
 
@@ -92,26 +91,26 @@ public void shouldLogPuts() throws Exception {
 
         store.put(key1, value1);
 
-        assertArrayEquals(value1, (byte[]) sent.get(SessionKeySerde.bytesToBinary(key1)));
+        assertArrayEquals(value1, (byte[]) sent.get(Bytes.wrap(SessionKeySchema.toBinary(key1))));
         EasyMock.verify(inner);
     }
 
     @Test
-    public void shouldLogRemoves() throws Exception {
+    public void shouldLogRemoves() {
         inner.remove(key1);
         EasyMock.expectLastCall();
 
         init();
         store.remove(key1);
 
-        final Bytes binaryKey = SessionKeySerde.bytesToBinary(key1);
+        final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key1));
         assertTrue(sent.containsKey(binaryKey));
         assertNull(sent.get(binaryKey));
         EasyMock.verify(inner);
     }
 
     @Test
-    public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception {
+    public void shouldDelegateToUnderlyingStoreWhenFetching() {
         EasyMock.expect(inner.findSessions(bytesKey, 0, Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
 
         init();
@@ -121,7 +120,7 @@ public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception {
     }
 
     @Test
-    public void shouldDelegateToUnderlyingStoreWhenFetchingRange() throws Exception {
+    public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
         EasyMock.expect(inner.findSessions(bytesKey, bytesKey, 0, Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
 
         init();
@@ -131,7 +130,7 @@ public void shouldDelegateToUnderlyingStoreWhenFetchingRange() throws Exception
     }
 
     @Test
-    public void shouldDelegateToUnderlyingStoreWhenFindingSessions() throws Exception {
+    public void shouldDelegateToUnderlyingStoreWhenFindingSessions() {
         EasyMock.expect(inner.findSessions(bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
 
         init();
@@ -141,7 +140,7 @@ public void shouldDelegateToUnderlyingStoreWhenFindingSessions() throws Exceptio
     }
 
     @Test
-    public void shouldDelegateToUnderlyingStoreWhenFindingSessionRange() throws Exception {
+    public void shouldDelegateToUnderlyingStoreWhenFindingSessionRange() {
         EasyMock.expect(inner.findSessions(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
 
         init();
@@ -151,7 +150,7 @@ public void shouldDelegateToUnderlyingStoreWhenFindingSessionRange() throws Exce
     }
 
     @Test
-    public void shouldFlushUnderlyingStore() throws Exception {
+    public void shouldFlushUnderlyingStore() {
         inner.flush();
         EasyMock.expectLastCall();
 
@@ -162,7 +161,7 @@ public void shouldFlushUnderlyingStore() throws Exception {
     }
 
     @Test
-    public void shouldCloseUnderlyingStore() throws Exception {
+    public void shouldCloseUnderlyingStore() {
         inner.close();
         EasyMock.expectLastCall();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index cd859a31e02..956172ebd46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -66,7 +66,7 @@
 
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         store = new ChangeLoggingWindowBytesStore(inner, false);
     }
 
@@ -81,7 +81,7 @@ private void init() {
     }
 
     @Test
-    public void shouldLogPuts() throws Exception {
+    public void shouldLogPuts() {
         inner.put(bytesKey, value1, 0);
         EasyMock.expectLastCall();
 
@@ -89,12 +89,12 @@ public void shouldLogPuts() throws Exception {
 
         store.put(bytesKey, value1);
 
-        assertArrayEquals(value1, (byte[]) sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 0)));
+        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)));
         EasyMock.verify(inner);
     }
 
     @Test
-    public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception {
+    public void shouldDelegateToUnderlyingStoreWhenFetching() {
         EasyMock.expect(inner.fetch(bytesKey, 0, 10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
 
         init();
@@ -104,7 +104,7 @@ public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception {
     }
 
     @Test
-    public void shouldDelegateToUnderlyingStoreWhenFetchingRange() throws Exception {
+    public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
         EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
 
         init();
@@ -123,8 +123,8 @@ public void shouldRetainDuplicatesWhenSet() {
         store.put(bytesKey, value1);
         store.put(bytesKey, value1);
 
-        assertArrayEquals(value1, (byte[]) sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 1)));
-        assertArrayEquals(value1, (byte[]) sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 2)));
+        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)));
+        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)));
 
         EasyMock.verify(inner);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
index 13e88eba9f6..ea31a041bb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.test.KeyValueIteratorStub;
 import org.junit.Test;
@@ -50,9 +49,9 @@ public long segmentId(Bytes key) {
     private final SessionWindow cacheWindow = new SessionWindow(10, 20);
     private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(
         KeyValue.pair(
-            SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(
-                SessionKeySerde.bytesToBinary(new Windowed<>(cacheKey, cacheWindow))
-            ),
+            SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(Bytes.wrap(
+                    SessionKeySchema.toBinary(new Windowed<>(cacheKey, cacheWindow))
+            )),
             new LRUCacheEntry(cacheKey.get())
         )).iterator();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index a32094c60f1..91351365c12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -57,14 +57,14 @@ public void shouldIterateOverValueFromBothIterators() {
             final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
             windowStoreKvPairs.add(v1);
             expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
-            final Bytes keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes);
+            final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary("a", t + 10, 0, stateSerdes);
             final byte[] valBytes = String.valueOf(t + 10).getBytes();
             expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
             cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes));
         }
 
-        Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
-        Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+        final Bytes fromBytes = WindowKeySchema.toStoreKeyBinary("a", 0, 0, stateSerdes);
+        final Bytes toBytes = WindowKeySchema.toStoreKeyBinary("a", 100, 0, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
 
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(
@@ -87,9 +87,9 @@ public void shouldIterateOverValueFromBothIterators() {
     @Test
     public void shouldPeekNextStoreKey() {
         windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
-        cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
-        Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
-        Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+        cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowKeySchema.toStoreKeyBinary("a", 0, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+        final Bytes fromBytes = WindowKeySchema.toStoreKeyBinary("a", 0, 0, stateSerdes);
+        final Bytes toBytes = WindowKeySchema.toStoreKeyBinary("a", 100, 0, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(
             namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)
@@ -106,9 +106,9 @@ public void shouldPeekNextStoreKey() {
     @Test
     public void shouldPeekNextCacheKey() {
         windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
-        cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
-        Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
-        Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+        cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowKeySchema.toStoreKeyBinary("a", 10L, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+        final Bytes fromBytes = WindowKeySchema.toStoreKeyBinary("a", 0, 0, stateSerdes);
+        final Bytes toBytes = WindowKeySchema.toStoreKeyBinary("a", 100, 0, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes));
         final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator(cacheIterator, storeIterator);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
index f1c46fb8e91..1e5f62c9ce7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
@@ -53,11 +53,8 @@ public long segmentId(Bytes key) {
     private final TimeWindow cacheWindow = new TimeWindow(10, 20);
     private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(
         KeyValue.pair(
-            SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(
-                WindowStoreUtils.toBinaryKey(
-                    cacheKey, cacheWindow.start(), 0,
-                    new StateSerdes<>("dummy", Serdes.String(), Serdes.String())
-                )
+            SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowKeySchema.toStoreKeyBinary(
+                    new Windowed<>(cacheKey, cacheWindow), 0, new StateSerdes<>("dummy", Serdes.String(), Serdes.ByteArray()))
             ),
             new LRUCacheEntry(cacheKey.getBytes())
         )).iterator();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index df50b2c14a1..e34d3cccc67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -23,7 +23,6 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -52,6 +51,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+// TODO: this test does not cover time window serdes
 public class RocksDBSegmentedBytesStoreTest {
 
     private final long retention = 60000L;
@@ -279,15 +279,17 @@ public void shouldBeAbleToWriteToReInitializedStore() {
     }
 
     private Bytes serializeKey(final Windowed<String> key) {
-        return SessionKeySerde.toBinary(key, Serdes.String().serializer(), "dummy");
+        return Bytes.wrap(SessionKeySchema.toBinary(key, Serdes.String().serializer(), "dummy"));
     }
 
     private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         while (iterator.hasNext()) {
             final KeyValue<Bytes, byte[]> next = iterator.next();
-            final KeyValue<Windowed<String>, Long> deserialized
-                    = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value));
+            final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
+                    SessionKeySchema.from(next.key.get(), Serdes.String().deserializer(), "dummy"),
+                    Serdes.Long().deserializer().deserialize("dummy", next.value)
+            );
             results.add(deserialized);
         }
         return results;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index a7a978a8d75..c745e702b09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -888,9 +888,10 @@ private void putSecondBatch(final WindowStore<Integer, String> store, final long
         HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
 
         for (KeyValue<byte[], byte[]> entry : changeLog) {
-            long timestamp = WindowStoreUtils.timestampFromBinaryKey(entry.key);
-            Integer key = WindowStoreUtils.keyFromBinaryKey(entry.key, serdes);
-            String value = entry.value == null ? null : serdes.valueFrom(entry.value);
+            final long timestamp = WindowKeySchema.extractStoreTimestamp(entry.key);
+
+            final Integer key = WindowKeySchema.extractStoreKey(entry.key, serdes);
+            final String value = entry.value == null ? null : serdes.valueFrom(entry.value);
 
             Set<String> entries = entriesByKey.get(key);
             if (entries == null) {
@@ -907,7 +908,7 @@ private void putSecondBatch(final WindowStore<Integer, String> store, final long
         return windowedPair(key, value, timestamp, windowSize);
     }
 
-    private <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
-        return KeyValue.pair(new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), value);
+    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
+        return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
index 2b6d0577871..29cdf11c862 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -25,12 +26,13 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 
+// TODO: this test coverage does not consider session serde yet
 public class SegmentedCacheFunctionTest {
 
     private static final int SEGMENT_INTERVAL = 17;
     private static final int TIMESTAMP = 736213517;
 
-    private static final Bytes THE_KEY = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42);
+    private static final Bytes THE_KEY = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42);
     private final static Bytes THE_CACHE_KEY = Bytes.wrap(
         ByteBuffer.allocate(8 + THE_KEY.get().length)
             .putLong(TIMESTAMP / SEGMENT_INTERVAL)
@@ -84,7 +86,7 @@ public void compareSegmentedKeys() {
             ) == 0
         );
 
-        final Bytes sameKeyInPriorSegment = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
+        final Bytes sameKeyInPriorSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
 
         assertThat(
             "same keys in different segments should be ordered according to segment",
@@ -102,7 +104,7 @@ public void compareSegmentedKeys() {
             ) > 0
         );
 
-        final Bytes lowerKeyInSameSegment = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0);
+        final Bytes lowerKeyInSameSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0);
 
         assertThat(
             "different keys in same segments should be ordered according to key",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index 3b731f93449..68527218796 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -17,10 +17,13 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.test.KeyValueIteratorStub;
 import org.junit.Before;
@@ -32,21 +35,34 @@
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class SessionKeySchemaTest {
 
+    private final String key = "key";
+    private final String topic = "topic";
+    private final long startTime = 50L;
+    private final long endTime = 100L;
+    private final Serde<String> serde = Serdes.String();
+
+    private final Window window = new SessionWindow(startTime, endTime);
+    private final Windowed<String> windowedKey = new Windowed<>(key, window);
+    private final Serde<Windowed<String>> keySerde = new WindowedSerdes.SessionWindowedSerde<>(serde);
+
     private final SessionKeySchema sessionKeySchema = new SessionKeySchema();
     private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
 
     @Before
     public void before() {
         sessionKeySchema.init("topic");
-        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6));
+        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0)))), 1),
+                                                                  KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0)))), 2),
+                                                                  KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0)))), 3),
+                                                                  KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20)))), 4),
+                                                                  KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20)))), 5),
+                                                                  KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20)))), 6));
         iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
     }
 
@@ -78,29 +94,29 @@ public void testUpperBoundWithLargeTimestamps() {
 
         assertThat(
             "shorter key with max timestamp should be in range",
-            upper.compareTo(
-                SessionKeySerde.bytesToBinary(
+            upper.compareTo(Bytes.wrap(
+                    SessionKeySchema.toBinary(
                     new Windowed<>(
                         Bytes.wrap(new byte[]{0xA}),
                         new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
                 )
-            ) >= 0
+            )) >= 0
         );
 
         assertThat(
             "shorter key with max timestamp should be in range",
-            upper.compareTo(
-                SessionKeySerde.bytesToBinary(
+            upper.compareTo(Bytes.wrap(
+                    SessionKeySchema.toBinary(
                     new Windowed<>(
                         Bytes.wrap(new byte[]{0xA, 0xB}),
                         new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
                 )
-            ) >= 0
+            )) >= 0
         );
 
-        assertThat(upper, equalTo(SessionKeySerde.bytesToBinary(
+        assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
             new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))))
-        );
+        ));
     }
 
     @Test
@@ -109,33 +125,33 @@ public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() {
 
         assertThat(
             "shorter key with max timestamp should be in range",
-            upper.compareTo(
-                SessionKeySerde.bytesToBinary(
+            upper.compareTo(Bytes.wrap(
+                    SessionKeySchema.toBinary(
                     new Windowed<>(
                         Bytes.wrap(new byte[]{0xA, (byte) 0x8F}),
                         new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
                 )
-            ) >= 0
+            )) >= 0
         );
 
-        assertThat(upper, equalTo(SessionKeySerde.bytesToBinary(
+        assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
             new Windowed<>(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))))
-        );
+        ));
     }
 
     @Test
     public void testUpperBoundWithZeroTimestamp() {
         Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
 
-        assertThat(upper, equalTo(SessionKeySerde.bytesToBinary(
+        assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
             new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))
-        );
+        ));
     }
 
     @Test
     public void testLowerBoundWithZeroTimestamp() {
         Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
-        assertThat(lower, equalTo(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))));
+        assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))));
     }
 
     @Test
@@ -144,18 +160,84 @@ public void testLowerBoundMatchesTrailingZeros() {
 
         assertThat(
             "appending zeros to key should still be in range",
-            lower.compareTo(
-                SessionKeySerde.bytesToBinary(
+            lower.compareTo(Bytes.wrap(
+                    SessionKeySchema.toBinary(
                     new Windowed<>(
                         Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}),
                         new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
                 )
-            ) < 0
+            )) < 0
         );
 
-        assertThat(lower, equalTo(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))));
+        assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))));
     }
 
+    @Test
+    public void shouldSerializeDeserialize() {
+        final byte[] bytes = keySerde.serializer().serialize(topic, windowedKey);
+        final Windowed<String> result = keySerde.deserializer().deserialize(topic, bytes);
+        assertEquals(windowedKey, result);
+    }
+
+    @Test
+    public void shouldSerializeNullToNull() {
+        assertNull(keySerde.serializer().serialize(topic, null));
+    }
+
+    @Test
+    public void shouldDeSerializeEmtpyByteArrayToNull() {
+        assertNull(keySerde.deserializer().deserialize(topic, new byte[0]));
+    }
+
+    @Test
+    public void shouldDeSerializeNullToNull() {
+        assertNull(keySerde.deserializer().deserialize(topic, null));
+    }
+
+    @Test
+    public void shouldConvertToBinaryAndBack() {
+        final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy");
+        final Windowed<String> result = SessionKeySchema.from(serialized, Serdes.String().deserializer(), "dummy");
+        assertEquals(windowedKey, result);
+    }
+
+    @Test
+    public void shouldExtractEndTimeFromBinary() {
+        final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy");
+        assertEquals(endTime, SessionKeySchema.extractEndTimestamp(serialized));
+    }
+
+    @Test
+    public void shouldExtractStartTimeFromBinary() {
+        final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy");
+        assertEquals(startTime, SessionKeySchema.extractStartTimestamp(serialized));
+    }
+
+    @Test
+    public void shouldExtractWindowFromBindary() {
+        final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy");
+        assertEquals(window, SessionKeySchema.extractWindow(serialized));
+    }
+
+    @Test
+    public void shouldExtractKeyBytesFromBinary() {
+        final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy");
+        assertArrayEquals(key.getBytes(), SessionKeySchema.extractKeyBytes(serialized));
+    }
+
+    @Test
+    public void shouldExtractKeyFromBinary() {
+        final byte[] serialized = SessionKeySchema.toBinary(windowedKey, serde.serializer(), "dummy");
+        assertEquals(windowedKey, SessionKeySchema.from(serialized, serde.deserializer(), "dummy"));
+    }
+
+    @Test
+    public void shouldExtractBytesKeyFromBinary() {
+        final Bytes bytesKey = Bytes.wrap(key.getBytes());
+        final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, window);
+        final Bytes serialized = Bytes.wrap(SessionKeySchema.toBinary(windowedBytesKey));
+        assertEquals(windowedBytesKey, SessionKeySchema.from(serialized));
+    }
 
     private List<Integer> getValues(final HasNextCondition hasNextCondition) {
         final List<Integer> results = new ArrayList<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index d75cca0b94a..63214a17ad6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -17,17 +17,25 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.KeyValueIteratorStub;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,25 +43,39 @@
 
 public class WindowKeySchemaTest {
 
-    private final WindowKeySchema windowKeySchema = new WindowKeySchema();
-    private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
+    final private String key = "key";
+    final private String topic = "topic";
+    final private long startTime = 50L;
+    final private long endTime = 100L;
+    final private Serde<String> serde = Serdes.String();
+
+    final private Window window = new TimeWindow(startTime, endTime);
+    final private Windowed<String> windowedKey = new Windowed<>(key, window);
+    final private WindowKeySchema windowKeySchema = new WindowKeySchema();
+    final private Serde<Windowed<String>> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde);
+    final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray());
 
     @Before
     public void before() {
         windowKeySchema.init("topic");
-        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5),
-                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6));
-        iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
     }
-    
+
     @Test
     public void testHasNextConditionUsingNullKeys() {
+        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(
+                KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new TimeWindow(0, 1)), 0), 1),
+                KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new TimeWindow(0, 1)), 0), 2),
+                KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new TimeWindow(0, 1)), 0), 3),
+                KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new TimeWindow(10, 20)), 4), 4),
+                KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new TimeWindow(10, 20)), 5), 5),
+                KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new TimeWindow(10, 20)), 6), 6));
+        final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
+
         final HasNextCondition hasNextCondition = windowKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
-        final List<Integer> results = getValues(hasNextCondition);
+        final List<Integer> results = new ArrayList<>();
+        while (hasNextCondition.hasNext(iterator)) {
+            results.add(iterator.next().value);
+        }
         assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
     }
 
@@ -64,7 +86,7 @@ public void testUpperBoundWithLargeTimestamps() {
         assertThat(
             "shorter key with max timestamp should be in range",
             upper.compareTo(
-                WindowStoreUtils.toBinaryKey(
+                WindowKeySchema.toStoreKeyBinary(
                     new byte[]{0xA},
                     Long.MAX_VALUE,
                     Integer.MAX_VALUE
@@ -75,7 +97,7 @@ public void testUpperBoundWithLargeTimestamps() {
         assertThat(
             "shorter key with max timestamp should be in range",
             upper.compareTo(
-                WindowStoreUtils.toBinaryKey(
+                WindowKeySchema.toStoreKeyBinary(
                     new byte[]{0xA, 0xB},
                     Long.MAX_VALUE,
                     Integer.MAX_VALUE
@@ -83,7 +105,7 @@ public void testUpperBoundWithLargeTimestamps() {
             ) >= 0
         );
 
-        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA}, Long.MAX_VALUE, Integer.MAX_VALUE)));
+        assertThat(upper, equalTo(WindowKeySchema.toStoreKeyBinary(new byte[]{0xA}, Long.MAX_VALUE, Integer.MAX_VALUE)));
     }
 
     @Test
@@ -93,7 +115,7 @@ public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() {
         assertThat(
             "shorter key with max timestamp should be in range",
             upper.compareTo(
-                WindowStoreUtils.toBinaryKey(
+                WindowKeySchema.toStoreKeyBinary(
                     new byte[]{0xA, (byte) 0x8F},
                     Long.MAX_VALUE,
                     Integer.MAX_VALUE
@@ -101,7 +123,7 @@ public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() {
             ) >= 0
         );
 
-        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}, Long.MAX_VALUE, Integer.MAX_VALUE)));
+        assertThat(upper, equalTo(WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}, Long.MAX_VALUE, Integer.MAX_VALUE)));
     }
 
 
@@ -112,7 +134,7 @@ public void testUpperBoundWithKeyBytesLargerAndSmallerThanFirstTimestampByte() {
         assertThat(
             "shorter key with max timestamp should be in range",
             upper.compareTo(
-                WindowStoreUtils.toBinaryKey(
+                WindowKeySchema.toStoreKeyBinary(
                     new byte[]{0xC, 0xC},
                     0x0AffffffffffffffL,
                     Integer.MAX_VALUE
@@ -120,25 +142,25 @@ public void testUpperBoundWithKeyBytesLargerAndSmallerThanFirstTimestampByte() {
             ) >= 0
         );
 
-        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xC, 0xC}, 0x0AffffffffffffffL, Integer.MAX_VALUE)));
+        assertThat(upper, equalTo(WindowKeySchema.toStoreKeyBinary(new byte[]{0xC, 0xC}, 0x0AffffffffffffffL, Integer.MAX_VALUE)));
     }
 
     @Test
     public void testUpperBoundWithZeroTimestamp() {
         Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
-        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, Integer.MAX_VALUE)));
+        assertThat(upper, equalTo(WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 0, Integer.MAX_VALUE)));
     }
 
     @Test
     public void testLowerBoundWithZeroTimestamp() {
         Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
-        assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
+        assertThat(lower, equalTo(WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
     }
 
     @Test
     public void testLowerBoundWithMonZeroTimestamp() {
         Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 42);
-        assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
+        assertThat(lower, equalTo(WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
     }
 
     @Test
@@ -148,7 +170,7 @@ public void testLowerBoundMatchesTrailingZeros() {
         assertThat(
             "appending zeros to key should still be in range",
             lower.compareTo(
-                WindowStoreUtils.toBinaryKey(
+                    WindowKeySchema.toStoreKeyBinary(
                         new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
                         Long.MAX_VALUE - 1,
                         0
@@ -156,14 +178,89 @@ public void testLowerBoundMatchesTrailingZeros() {
             ) < 0
         );
 
-        assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
+        assertThat(lower, equalTo(WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
     }
-    
-    private List<Integer> getValues(final HasNextCondition hasNextCondition) {
-        final List<Integer> results = new ArrayList<>();
-        while (hasNextCondition.hasNext(iterator)) {
-            results.add(iterator.next().value);
-        }
-        return results;
+
+    @Test
+    public void shouldSerializeDeserialize() {
+        final byte[] bytes = keySerde.serializer().serialize(topic, windowedKey);
+        final Windowed<String> result = keySerde.deserializer().deserialize(topic, bytes);
+        // TODO: fix this part as last bits of KAFKA-4468
+        assertEquals(new Windowed<>(key, new TimeWindow(startTime, Long.MAX_VALUE)), result);
+    }
+
+    @Test
+    public void testSerializeDeserializeOverflowWindowSize() {
+        final byte[] bytes = keySerde.serializer().serialize(topic, windowedKey);
+        final Windowed<String> result = new TimeWindowedDeserializer<>(serde.deserializer(), Long.MAX_VALUE - 1)
+                .deserialize(topic, bytes);
+        assertEquals(new Windowed<>(key, new TimeWindow(startTime, Long.MAX_VALUE)), result);
+    }
+
+    @Test
+    public void shouldSerializeDeserializeExpectedWindowSize() {
+        final byte[] bytes = keySerde.serializer().serialize(topic, windowedKey);
+        final Windowed<String> result = new TimeWindowedDeserializer<>(serde.deserializer(), endTime - startTime)
+                .deserialize(topic, bytes);
+        assertEquals(windowedKey, result);
+    }
+
+    @Test
+    public void shouldSerializeNullToNull() {
+        assertNull(keySerde.serializer().serialize(topic, null));
+    }
+
+    @Test
+    public void shouldDeSerializeEmtpyByteArrayToNull() {
+        assertNull(keySerde.deserializer().deserialize(topic, new byte[0]));
+    }
+
+    @Test
+    public void shouldDeSerializeNullToNull() {
+        assertNull(keySerde.deserializer().deserialize(topic, null));
+    }
+
+    @Test
+    public void shouldConvertToBinaryAndBack() {
+        final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
+        final Windowed<String> result = WindowKeySchema.fromStoreKey(serialized.get(), endTime - startTime, stateSerdes);
+        assertEquals(windowedKey, result);
+    }
+
+    @Test
+    public void shouldExtractEndTimeFromBinary() {
+        final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
+        assertEquals(0, WindowKeySchema.extractStoreSequence(serialized.get()));
+    }
+
+    @Test
+    public void shouldExtractStartTimeFromBinary() {
+        final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
+        assertEquals(startTime, WindowKeySchema.extractStoreTimestamp(serialized.get()));
+    }
+
+    @Test
+    public void shouldExtractWindowFromBindary() {
+        final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
+        assertEquals(window, WindowKeySchema.extractStoreWindow(serialized.get(), endTime - startTime));
+    }
+
+    @Test
+    public void shouldExtractKeyBytesFromBinary() {
+        final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
+        assertArrayEquals(key.getBytes(), WindowKeySchema.extractStoreKeyBytes(serialized.get()));
+    }
+
+    @Test
+    public void shouldExtractKeyFromBinary() {
+        final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
+        assertEquals(windowedKey, WindowKeySchema.fromStoreKey(serialized.get(), endTime - startTime, stateSerdes));
+    }
+
+    @Test
+    public void shouldExtractBytesKeyFromBinary() {
+        final Windowed<Bytes> windowedBytesKey = new Windowed<>(Bytes.wrap(key.getBytes()), window);
+        final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedBytesKey, 0);
+        assertEquals(windowedBytesKey, WindowKeySchema.fromStoreKey(serialized.get(), endTime - startTime));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
deleted file mode 100644
index 91c89ec9540..00000000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowStoreUtilsTest {
-    protected StateSerdes<String, String> serdes = new StateSerdes<>("dummy", new Serdes.StringSerde(), new Serdes.StringSerde());
-
-    @Test
-    public void testSerialization() {
-        final String key = "key1";
-        final long timestamp = 99L;
-        final int seqNum = 3;
-        Bytes bytes = WindowStoreUtils.toBinaryKey(key, timestamp, seqNum, serdes);
-        final String parsedKey = WindowStoreUtils.keyFromBinaryKey(bytes.get(), serdes);
-        final long parsedTs = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
-        final int parsedSeqNum = WindowStoreUtils.sequenceNumberFromBinaryKey(bytes.get());
-        assertEquals(key, parsedKey);
-        assertEquals(timestamp, parsedTs);
-        assertEquals(seqNum, parsedSeqNum);
-    }
-}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Extract WindowedSerde to public APIs
> ------------------------------------
>
>                 Key: KAFKA-4831
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4831
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Vitaly Pushkar
>            Priority: Major
>              Labels: needs-kip, newbie, user-experience
>
> Now that we have augmented WindowSerde with non-arg parameters, the next step is to extract it out as part of the public APIs so that users who wants to I/O windowed streams can use it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message