apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From david...@apache.org
Subject apex-malhar git commit: APEXMALHAR-2244 #comment Use TimeUnifiedManageStateStore for Spillable Data Structure
Date Mon, 31 Oct 2016 19:36:08 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 27272a588 -> 16edf3067


APEXMALHAR-2244 #comment Use TimeUnifiedManageStateStore for Spillable Data Structure


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/16edf306
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/16edf306
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/16edf306

Branch: refs/heads/master
Commit: 16edf3067226735682ecce63eedd0b120a708427
Parents: 27272a5
Author: Siyuan Hua <hsy541@apache.org>
Authored: Thu Oct 27 14:06:51 2016 -0700
Committer: Siyuan Hua <hsy541@apache.org>
Committed: Mon Oct 31 11:14:04 2016 -0700

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java |   2 +-
 .../malhar/lib/state/managed/TimeExtractor.java |  27 +++
 .../spillable/SpillableComplexComponent.java    |  40 ++++
 .../SpillableComplexComponentImpl.java          |  44 +++-
 .../lib/state/spillable/SpillableMapImpl.java   |  45 +++-
 .../lib/state/spillable/SpillableSetImpl.java   |  23 +-
 .../spillable/SpillableSetMultimapImpl.java     |  95 ++++++--
 ...agedTimeUnifiedStateSpillableStateStore.java |  29 +++
 .../impl/SpillableWindowedKeyedStorage.java     |   4 +-
 .../impl/SpillableWindowedPlainStorage.java     |   2 +-
 .../window/impl/WindowKeyPairTimeExtractor.java |  40 ++++
 .../lib/window/impl/WindowTimeExtractor.java    |  35 +++
 .../state/spillable/SpillableMapImplTest.java   | 234 ++++++++-----------
 .../state/spillable/SpillableSetImplTest.java   |  27 ++-
 .../spillable/SpillableSetMultimapImplTest.java |  21 +-
 .../lib/state/spillable/SpillableTestUtils.java |   4 +
 .../spillable/TestStringTimeExtractor.java      |  36 +++
 .../window/SpillableWindowedStorageTest.java    |  19 +-
 .../malhar/lib/window/WindowedOperatorTest.java |  90 +++----
 19 files changed, 580 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 20271b0..1c52c31 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -316,7 +316,7 @@ public abstract class AbstractManagedStateImpl
 
   protected int getBucketIdx(long bucketId)
   {
-    return (int)(bucketId % numBuckets);
+    return (int)Math.abs(bucketId % numBuckets);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
new file mode 100644
index 0000000..e70e80f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * A way to extract time from data
+ */
+public interface TimeExtractor<T>
+{
+  long getTime(T t);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index 542a914..b6ec6a2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -18,6 +18,7 @@
  */
 package org.apache.apex.malhar.lib.state.spillable;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.state.spillable.Spillable.SpillableComponent;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 
@@ -68,6 +69,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
       Serde<V> serdeValue);
 
   /**
+   * This is a method for creating a {@link SpillableMap}. This method
+   * auto-generates an identifier for the data structure.
+   * @param <K> The type of the keys.
+   * @param <V> The type of the values.
+   * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+   * @param serdeValue The Serializer/Deserializer to use for the map's values.
+   * @param timeExtractor a util object to extract time from key.
+   * @return A {@link SpillableMap}.
+   */
+  <K, V> SpillableMap<K, V> newSpillableMap(Serde<K> serdeKey,
+      Serde<V> serdeValue, TimeExtractor<K> timeExtractor);
+
+  /**
    * This is a method for creating a {@link SpillableMap}.
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
@@ -81,6 +95,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
       Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
+   * This is a method for creating a {@link SpillableMap}.
+   * @param <K> The type of the keys.
+   * @param <V> The type of the values.
+   * @param identifier The identifier for this {@link SpillableMap}.
+   * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+   * @param serdeValue The Serializer/Deserializer to use for the map's values.
+   * @param timeExtractor a util object to extract time from key.
+   * @return A {@link SpillableMap}.
+   */
+  <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier,
+      Serde<K> serdeKey, Serde<V> serdeValue, TimeExtractor<K> timeExtractor);
+
+  /**
    * This is a method for creating a {@link SpillableListMultimap}. This method
    * auto-generates an identifier for the data structure.
    * @param <K> The type of the keys.
@@ -118,6 +145,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
   <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
+   * This is a method for creating a {@link SpillableSetMultimap}.
+   * @param <K> The type of the keys.
+   * @param <V> The type of the values in the map's lists.
+   * @param bucket The bucket that this {@link SpillableSetMultimap} will be spilled to.
+   * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+   * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
+   * @param timeExtractor a util object to extract time from key.
+   * @return A {@link SpillableSetMultimap}.
+   */
+  <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue, TimeExtractor<K> timeExtractor);
+
+  /**
    * This is a method for creating a {@link SpillableMultiset}. This method
    * auto-generates an identifier for the data structure.
    * @param <T> The type of the elements.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index 1a3f550..1d9fbc6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -75,7 +76,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   @Override
   public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde)
   {
-    SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
+    SpillableArrayListImpl<T> list = new SpillableArrayListImpl<>(bucket, identifierGenerator.next(), store, serde);
     componentList.add(list);
     return list;
   }
@@ -84,7 +85,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde)
   {
     identifierGenerator.register(identifier);
-    SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+    SpillableArrayListImpl<T> list = new SpillableArrayListImpl<>(bucket, identifier, store, serde);
     bucketIds.add(bucket);
     componentList.add(list);
     return list;
@@ -94,7 +95,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
       Serde<V> serdeValue)
   {
-    SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
+    SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifierGenerator.next(),
         bucket, serdeKey, serdeValue);
     bucketIds.add(bucket);
     componentList.add(map);
@@ -106,16 +107,35 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
       Serde<V> serdeValue)
   {
     identifierGenerator.register(identifier);
-    SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+    SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifier, bucket, serdeKey, serdeValue);
     bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
   @Override
+  public <K, V> SpillableMap<K, V> newSpillableMap(Serde<K> serdeKey,
+      Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+  {
+    SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifierGenerator.next(), serdeKey, serdeValue, timeExtractor);
+    componentList.add(map);
+    return map;
+  }
+
+  @Override
+  public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, Serde<K> serdeKey,
+      Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+  {
+    identifierGenerator.register(identifier);
+    SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifier, serdeKey, serdeValue, timeExtractor);
+    componentList.add(map);
+    return map;
+  }
+
+  @Override
   public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
   {
-    SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
+    SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<>(store,
         identifierGenerator.next(), bucket, serdeKey, serdeValue);
     bucketIds.add(bucket);
     componentList.add(map);
@@ -128,7 +148,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
       Serde<V> serdeValue)
   {
     identifierGenerator.register(identifier);
-    SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
+    SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<>(store,
         identifier, bucket, serdeKey, serdeValue);
     bucketIds.add(bucket);
     componentList.add(map);
@@ -138,7 +158,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   @Override
   public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
   {
-    SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
+    SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<>(store,
         identifierGenerator.next(), bucket, serdeKey, serdeValue);
     bucketIds.add(bucket);
     componentList.add(map);
@@ -146,6 +166,16 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   }
 
   @Override
+  public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+  {
+    SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<>(store,
+        identifierGenerator.next(), bucket, serdeKey, serdeValue, timeExtractor);
+    componentList.add(map);
+    return map;
+  }
+
+  @Override
   public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index 5fa39d7..e7071a2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
 import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
@@ -55,10 +56,11 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
   private transient Input tmpInput = new Input();
 
+  private TimeExtractor<K> timeExtractor;
+
   @NotNull
   private SpillableStateStore store;
-  @NotNull
-  private byte[] identifier;
+
   private long bucket;
 
   private int size = 0;
@@ -76,16 +78,32 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
    * @param identifier The Id of this {@link SpillableMapImpl}.
    * @param bucket The Id of the bucket used to store this
    * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
-   * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
-   * @param keySerde The {@link Serde} to use when serializing and deserializing values.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+   * @param serdeValue The {@link Serde} to use when serializing and deserializing values.
    */
-  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde,
-      Serde<V> valueSerde)
+  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue)
   {
     this.store = Preconditions.checkNotNull(store);
-    this.identifier = Preconditions.checkNotNull(identifier);
     this.bucket = bucket;
-    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde));
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+  }
+
+  /**
+   * Creats a {@link SpillableMapImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param identifier The Id of this {@link SpillableMapImpl}.
+   * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+   * @param serdeValue The {@link Serde} to use when serializing and deserializing values.
+   * @param timeExtractor Extract time from the each element and use it to decide where the data goes
+   */
+  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, Serde<K> serdeKey,
+      Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+    this.timeExtractor = timeExtractor;
   }
 
   public SpillableStateStore getStore()
@@ -132,7 +150,7 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
       return val;
     }
 
-    Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false));
+    Slice valSlice = store.getSync(getBucket(key), keyValueSerdeManager.serializeDataKey(key, false));
 
     if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
       return null;
@@ -219,12 +237,12 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   public void endWindow()
   {
     for (K key: cache.getChangedKeys()) {
-      store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true),
+      store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true),
           keyValueSerdeManager.serializeValue(cache.get(key)));
     }
 
     for (K key: cache.getRemovedKeys()) {
-      store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
+      store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
     }
     cache.endWindow();
     keyValueSerdeManager.resetReadBuffer();
@@ -234,4 +252,9 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   public void teardown()
   {
   }
+
+  private long getBucket(K key)
+  {
+    return timeExtractor != null ? timeExtractor.getTime(key) : bucket;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index 0dfc411..221cd38 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -89,8 +90,6 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
   }
 
   @NotNull
-  private SpillableStateStore store;
-  @NotNull
   private SpillableMapImpl<T, ListNode<T>> map;
 
   private T head;
@@ -103,7 +102,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
 
   public SpillableStateStore getStore()
   {
-    return store;
+    return map.getStore();
   }
 
   /**
@@ -118,9 +117,23 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
       @NotNull SpillableStateStore store,
       @NotNull Serde<T> serde)
   {
-    this.store = Preconditions.checkNotNull(store);
+    map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, bucketId, serde, new ListNodeSerde<>(serde));
+  }
 
-    map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde));
+  /**
+   * Creates a {@link SpillableSetImpl}.
+   * {@link SpillableSetImpl} in the provided {@link SpillableStateStore}.
+   * @param prefix The Id of this {@link SpillableSetImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param serde The {@link Serde} to use when serializing and deserializing data.
+   * @param timeExtractor Extract time from the each element and use it to decide where the data goes.
+   */
+  public SpillableSetImpl(@NotNull byte[] prefix,
+      @NotNull SpillableStateStore store,
+      @NotNull Serde<T> serde,
+      @NotNull TimeExtractor timeExtractor)
+  {
+    map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, serde, new ListNodeSerde<>(serde), timeExtractor);
   }
 
   public void setSize(int size)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 76e47f2..fb88d9c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -27,10 +27,11 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
 import org.apache.apex.malhar.lib.utils.serde.IntSerde;
 import org.apache.apex.malhar.lib.utils.serde.PairSerde;
-import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -55,21 +56,46 @@ import com.datatorrent.netlet.util.Slice;
 public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMultimap<K, V>,
     Spillable.SpillableComponent
 {
+
+  private static class FixedTimeExtractor<V> implements TimeExtractor<V>
+  {
+
+    private long fixedTime;
+
+    private FixedTimeExtractor(long fixedTime)
+    {
+      this.fixedTime = fixedTime;
+    }
+
+    private FixedTimeExtractor()
+    {
+      // For kryo
+    }
+
+    @Override
+    public long getTime(V v)
+    {
+      return fixedTime;
+    }
+
+  }
+
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
 
   private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>();
 
   @NotNull
-  private SpillableMapImpl<Slice, Pair<Integer, V>> map;
+  private SpillableMapImpl<K, Pair<Integer, V>> map;
   private SpillableStateStore store;
-  private byte[] identifier;
   private long bucket;
   private Serde<V> valueSerde;
   private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
 
-  protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
-  protected transient Context.OperatorContext context;
+  private TimeExtractor<K> timeExtractor = null;
+  private AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+  private transient Context.OperatorContext context;
+
   private SpillableSetMultimapImpl()
   {
     // for kryo
@@ -81,8 +107,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
    * @param identifier The Id of this {@link SpillableSetMultimapImpl}.
    * @param bucket The Id of the bucket used to store this
    * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+   * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
    */
   public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
       Serde<K> keySerde,
@@ -93,7 +119,32 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     this.valueSerde = Preconditions.checkNotNull(valueSerde);
     keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
 
-    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde));
+    map = new SpillableMapImpl<>(store, identifier, bucket, new AffixSerde<>(null, keySerde, META_KEY_SUFFIX), new PairSerde<>(new IntSerde(), valueSerde));
+  }
+
+
+  /**
+   * Creates a {@link SpillableSetMultimapImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param identifier The Id of this {@link SpillableSetMultimapImpl}.
+   * @param bucket The Id of the bucket used to store this
+   * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+   * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
+   * @param timeExtractor The {@link TimeExtractor} to be used to retrieve time from key
+   */
+  public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+      Serde<K> keySerde,
+      Serde<V> valueSerde,
+      TimeExtractor<K> timeExtractor)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    this.bucket = bucket;
+    this.valueSerde = Preconditions.checkNotNull(valueSerde);
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
+    this.timeExtractor = timeExtractor;
+
+    map = new SpillableMapImpl<>(store, identifier, new AffixSerde<>(null, keySerde, META_KEY_SUFFIX), new PairSerde<>(new IntSerde(), valueSerde), timeExtractor);
   }
 
   public SpillableStateStore getStore()
@@ -112,14 +163,23 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = cache.get(key);
 
     if (spillableSet == null) {
-      Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
+      long keyTime = -1;
+      Pair<Integer, V> meta;
+      if (timeExtractor != null) {
+        keyTime = timeExtractor.getTime(key);
+      }
+      meta = map.get(key);
 
       if (meta == null) {
         return null;
       }
 
       Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
-      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+      if (timeExtractor != null) {
+        spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new FixedTimeExtractor(keyTime));
+      } else {
+        spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+      }
       spillableSet.setSize(meta.getLeft());
       spillableSet.setHead(meta.getRight());
       spillableSet.setup(context);
@@ -166,8 +226,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = getHelper((K)key);
     if (spillableSet != null) {
       cache.remove((K)key);
-      Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
-      map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
+      map.put((K)key, new ImmutablePair<>(0, spillableSet.getHead()));
       spillableSet.clear();
       removedSets.add(spillableSet);
     }
@@ -199,8 +258,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     if (cache.contains((K)key)) {
       return true;
     }
-    Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
-    Pair<Integer, V> meta = map.get(keySlice);
+    Pair<Integer, V> meta = map.get((K)key);
     return meta != null && meta.getLeft() > 0;
   }
 
@@ -227,7 +285,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = getHelper(key);
 
     if (spillableSet == null) {
-      spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+      if (timeExtractor == null) {
+        spillableSet = new SpillableSetImpl<>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+      } else {
+        spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new FixedTimeExtractor(timeExtractor.getTime(key)));
+      }
       spillableSet.setup(context);
       cache.put(key, spillableSet);
     }
@@ -304,8 +366,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
       SpillableSetImpl<V> spillableSet = cache.get(key);
       spillableSet.endWindow();
 
-      map.put(keyValueSerdeManager.serializeMetaKey(key, true),
-          new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
+      map.put(key, new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
     }
 
     for (SpillableSetImpl removedSet : removedSets) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java
new file mode 100644
index 0000000..207cb31
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.managed;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+
+/**
+ *
+ */
+public class ManagedTimeUnifiedStateSpillableStateStore extends ManagedTimeUnifiedStateImpl implements SpillableStateStore
+{
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index ef111b3..d41c494 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -180,10 +180,10 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
     }
 
     if (windowKeyToValueMap == null) {
-      windowKeyToValueMap = scc.newSpillableMap(bucket, windowKeyPairSerde, valueSerde);
+      windowKeyToValueMap = scc.newSpillableMap(windowKeyPairSerde, valueSerde, new WindowKeyPairTimeExtractor());
     }
     if (windowToKeysMap == null) {
-      windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde);
+      windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde, new WindowTimeExtractor());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index 9a8a291..f9bbc17 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -133,7 +133,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
       valueSerde = new GenericSerde<>();
     }
     if (windowToDataMap == null) {
-      windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde);
+      windowToDataMap = scc.newSpillableMap(windowSerde, valueSerde, new WindowTimeExtractor());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java
new file mode 100644
index 0000000..ecf63a5
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * A {@link TimeExtractor} to extract time from Pair of {@link Window} and key
+ * The type of key doesn't matter in this case, so it assumes object as the key type
+ */
+public class WindowKeyPairTimeExtractor<K> implements TimeExtractor<Pair<Window, K>>
+{
+
+  private final WindowTimeExtractor windowTimeExtractor = new WindowTimeExtractor();
+
+  @Override
+  public long getTime(Pair<Window, K> windowKeyPair)
+  {
+    return windowTimeExtractor.getTime(windowKeyPair.getKey());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java
new file mode 100644
index 0000000..aee389a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
+import org.apache.apex.malhar.lib.window.Window;
+
+/**
+ * A {@link TimeExtractor} to extract time from {@link Window}
+ */
+public class WindowTimeExtractor implements TimeExtractor<Window>
+{
+  @Override
+  public long getTime(Window window)
+  {
+    return window.getBeginTimestamp() + window.getDurationMillis();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index a96a8fd..760bc5c 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -21,7 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
 import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
@@ -31,31 +33,46 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+
+@RunWith(JUnitParamsRunner.class)
 public class SpillableMapImplTest
 {
   public static final byte[] ID1 = new byte[]{(byte)0};
   public static final byte[] ID2 = new byte[]{(byte)1};
 
+  public static final TestStringTimeExtractor TE = new TestStringTimeExtractor();
+
+  private SpillableStateStore store;
+
+  private TimeExtractor<String> te = null;
+
+
   @Rule
   public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
 
-  @Test
-  public void simpleGetAndPutTest()
-  {
-    InMemSpillableStateStore store = new InMemSpillableStateStore();
 
-    simpleGetAndPutTestHelper(store);
+  private void setup(String opt)
+  {
+    if (opt.equals("InMem")) {
+      store = new InMemSpillableStateStore();
+      te = null;
+    } else if (opt.equals("ManagedState")) {
+      store = testMeta.store;
+      te = null;
+    } else {
+      store = testMeta.timeStore;
+      te = TE;
+    }
   }
 
 
   @Test
-  public void simpleGetAndPutManagedStateTest()
-  {
-    simpleGetAndPutTestHelper(testMeta.store);
-  }
-
-  private void simpleGetAndPutTestHelper(SpillableStateStore store)
+  @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"})
+  public void simpleGetAndPutTest(String opt)
   {
+    setup(opt);
     SpillableMapImpl<String, String> map = createSpillableMap(store);
 
     store.setup(testMeta.operatorContext);
@@ -73,15 +90,9 @@ public class SpillableMapImplTest
 
     Assert.assertEquals(3, map.size());
 
-    Assert.assertEquals("1", map.get("a"));
-    Assert.assertEquals("2", map.get("b"));
-    Assert.assertEquals("3", map.get("c"));
-    Assert.assertEquals(null, map.get("d"));
+    assertMultiEqualsFromMap(map, new String[]{"1", "2", "3", null}, new String[]{"a", "b", "c", "d"});
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{null, null, null, null});
 
     map.endWindow();
     store.endWindow();
@@ -93,17 +104,11 @@ public class SpillableMapImplTest
     store.beginWindow(windowId);
     map.beginWindow(windowId);
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{"1", "2", "3", null});
 
     Assert.assertEquals(3, map.size());
 
-    Assert.assertEquals("1", map.get("a"));
-    Assert.assertEquals("2", map.get("b"));
-    Assert.assertEquals("3", map.get("c"));
-    Assert.assertEquals(null, map.get("d"));
+    assertMultiEqualsFromMap(map, new String[]{"1", "2", "3", null}, new String[]{"a", "b", "c", "d"});
 
     map.put("d", "4");
     map.put("e", "5");
@@ -111,16 +116,9 @@ public class SpillableMapImplTest
 
     Assert.assertEquals(6, map.size());
 
-    Assert.assertEquals("4", map.get("d"));
-    Assert.assertEquals("5", map.get("e"));
-    Assert.assertEquals("6", map.get("f"));
+    assertMultiEqualsFromMap(map, new String[]{"4", "5", "6"}, new String[]{"d", "e", "f"});
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f"}, ID1, new String[]{"1", "2", "3", null, null, null});
 
     map.endWindow();
     store.endWindow();
@@ -132,13 +130,8 @@ public class SpillableMapImplTest
     store.beginWindow(windowId);
     map.beginWindow(windowId);
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", "2", "3", "4", "5", "6", null});
 
     map.endWindow();
     store.endWindow();
@@ -150,28 +143,43 @@ public class SpillableMapImplTest
     store.teardown();
   }
 
-  @Test
-  public void simpleRemoveTest()
+  private void multiValueCheck(String[] keys, byte[] samePrefix, String[] expectedVal)
   {
-    InMemSpillableStateStore store = new InMemSpillableStateStore();
-    simpleRemoveTestHelper(store);
+    for (int i = 0; i < keys.length; i++) {
+      SpillableTestUtils.checkValue(store, _bid(keys[i], te), keys[i], samePrefix, expectedVal[i]);
+    }
   }
 
+  private void assertMultiEqualsFromMap(SpillableMapImpl<String, String> map, String[] expectedV, String[] keys)
+  {
+    for (int i = 0; i < expectedV.length; i++) {
+      Assert.assertEquals(expectedV[i], map.get(keys[i]));
+    }
+  }
 
-  @Test
-  public void simpleRemoveManagedStateTest()
+  private long _bid(String key, TimeExtractor<String> te)
   {
-    simpleRemoveTestHelper(testMeta.store);
+    if (te != null) {
+      return te.getTime(key);
+    } else {
+      return 0L;
+    }
   }
 
-  protected SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store)
+  private SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store)
   {
-    return new SpillableMapImpl<String, String>(store, ID1, 0L, new StringSerde(),
-        new StringSerde());
+    if (te == null) {
+      return new SpillableMapImpl<>(store,ID1,0L,new StringSerde(), new StringSerde());
+    } else {
+      return new SpillableMapImpl<>(store,ID1,new StringSerde(), new StringSerde(), te);
+    }
   }
 
-  private void simpleRemoveTestHelper(SpillableStateStore store)
+  @Test
+  @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"})
+  public void simpleRemoveTest(String opt)
   {
+    setup(opt);
     SpillableMapImpl<String, String> map = createSpillableMap(store);
 
     store.setup(testMeta.operatorContext);
@@ -199,10 +207,7 @@ public class SpillableMapImplTest
 
     Assert.assertEquals(1, map.size());
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{null, null, null, null});
 
     map.endWindow();
     store.endWindow();
@@ -210,10 +215,7 @@ public class SpillableMapImplTest
     store.checkpointed(windowId);
     store.committed(windowId);
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{"1", null, null, null});
 
     windowId++;
     store.beginWindow(windowId);
@@ -236,12 +238,7 @@ public class SpillableMapImplTest
     Assert.assertEquals("5", map.get("e"));
     Assert.assertEquals("6", map.get("f"));
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f"}, ID1, new String[]{"1", null, null, null, null, null});
 
     map.endWindow();
     store.endWindow();
@@ -253,13 +250,7 @@ public class SpillableMapImplTest
     store.beginWindow(windowId);
     map.beginWindow(windowId);
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", null, null, "4", "5", "6", null});
 
     map.remove("a");
     map.remove("d");
@@ -271,13 +262,7 @@ public class SpillableMapImplTest
     Assert.assertEquals("6", map.get("f"));
     Assert.assertEquals(null, map.get("g"));
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", null, null, "4", "5", "6", null});
 
     map.endWindow();
     store.endWindow();
@@ -289,13 +274,7 @@ public class SpillableMapImplTest
     store.beginWindow(windowId);
     map.beginWindow(windowId);
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+    multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{null, null, null, null, "5", "6", null});
 
     map.endWindow();
     store.endWindow();
@@ -308,29 +287,21 @@ public class SpillableMapImplTest
   }
 
   @Test
-  public void multiMapPerBucketTest()
-  {
-    InMemSpillableStateStore store = new InMemSpillableStateStore();
-
-    multiMapPerBucketTestHelper(store);
-  }
-
-  @Test
-  public void multiMapPerBucketManagedStateTest()
-  {
-    multiMapPerBucketTestHelper(testMeta.store);
-  }
-
-  public void multiMapPerBucketTestHelper(SpillableStateStore store)
+  @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"})
+  public void multiMapPerBucketTest(String opt)
   {
+    setup(opt);
     StringSerde sss = new StringSerde();
 
-    SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 0L,
-        new StringSerde(),
-        new StringSerde());
-    SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 0L,
-        new StringSerde(),
-        new StringSerde());
+    SpillableMapImpl<String, String> map1 = null;
+    SpillableMapImpl<String, String> map2 = null;
+    if (te == null) {
+      map1 = new SpillableMapImpl<>(store, ID1, 0L, sss, sss);
+      map2 = new SpillableMapImpl<>(store, ID2, 0L, sss, sss);
+    } else {
+      map1 = new SpillableMapImpl<>(store, ID1, sss, sss, te);
+      map2 = new SpillableMapImpl<>(store, ID2, sss, sss, te);
+    }
 
     store.setup(testMeta.operatorContext);
     map1.setup(testMeta.operatorContext);
@@ -372,12 +343,9 @@ public class SpillableMapImplTest
     map1.beginWindow(windowId);
     map2.beginWindow(windowId);
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+    multiValueCheck(new String[]{"a", "b"}, ID1, new String[]{"1", "2"});
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID2, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3");
+    multiValueCheck(new String[]{"a", "b", "c"}, ID2, new String[]{"a1", null, "3"});
 
     map1.remove("a");
 
@@ -395,8 +363,8 @@ public class SpillableMapImplTest
     map1.beginWindow(windowId);
     map2.beginWindow(windowId);
 
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+    multiValueCheck(new String[]{"a"}, ID1, new String[]{null});
+    multiValueCheck(new String[]{"a"}, ID2, new String[]{"a1"});
 
     map1.endWindow();
     map2.endWindow();
@@ -410,18 +378,22 @@ public class SpillableMapImplTest
   }
 
   @Test
-  public void recoveryWithManagedStateTest() throws Exception
+  @Parameters({"ManagedState","TimeUnifiedManagedState"})
+  public void recoveryWithManagedStateTest(String opt) throws Exception
   {
+    setup(opt);
     StringSerde sss = new StringSerde();
+    SpillableMapImpl<String, String> map1 = null;
+    if (te == null) {
+      map1 = new SpillableMapImpl<>(store, ID1, 0L, sss, sss);
+    } else {
+      map1 = new SpillableMapImpl<>(store, ID1, sss, sss, te);
+    }
 
-    SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(testMeta.store, ID1, 0L,
-        new StringSerde(),
-        new StringSerde());
-
-    testMeta.store.setup(testMeta.operatorContext);
+    store.setup(testMeta.operatorContext);
     map1.setup(testMeta.operatorContext);
 
-    testMeta.store.beginWindow(0);
+    store.beginWindow(0);
     map1.beginWindow(0);
     map1.put("x", "1");
     map1.put("y", "2");
@@ -429,9 +401,9 @@ public class SpillableMapImplTest
     map1.put("zz", "33");
     Assert.assertEquals(4, map1.size());
     map1.endWindow();
-    testMeta.store.endWindow();
+    store.endWindow();
 
-    testMeta.store.beginWindow(1);
+    store.beginWindow(1);
     map1.beginWindow(1);
     Assert.assertEquals(4, map1.size());
     map1.put("x", "4");
@@ -439,13 +411,13 @@ public class SpillableMapImplTest
     map1.remove("zz");
     Assert.assertEquals(3, map1.size());
     map1.endWindow();
-    testMeta.store.endWindow();
-    testMeta.store.beforeCheckpoint(1);
-    testMeta.store.checkpointed(1);
+    store.endWindow();
+    store.beforeCheckpoint(1);
+    store.checkpointed(1);
 
     SpillableMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
 
-    testMeta.store.beginWindow(2);
+    store.beginWindow(2);
     map1.beginWindow(2);
     Assert.assertEquals(3, map1.size());
     map1.put("x", "6");
@@ -453,11 +425,11 @@ public class SpillableMapImplTest
     map1.put("w", "8");
     Assert.assertEquals(4, map1.size());
     map1.endWindow();
-    testMeta.store.endWindow();
+    store.endWindow();
 
     // simulating crash here
     map1.teardown();
-    testMeta.store.teardown();
+    store.teardown();
 
     Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
     attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
index d0343e1..3f078cf 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
@@ -25,6 +25,7 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
 import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
@@ -37,24 +38,38 @@ public class SpillableSetImplTest
   @Rule
   public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
 
+  public TimeExtractor<String> te = null;
+
   @Test
-  public void simpleAddGetAndSetTest1()
+  public void simpleAddGetAndSetTest()
   {
     InMemSpillableStateStore store = new InMemSpillableStateStore();
 
-    simpleAddGetAndSetTest1Helper(store);
+    simpleAddGetAndSetTestHelper(store);
+  }
+
+  @Test
+  public void simpleAddGetAndSetTimeUnifiedManagedStateTest()
+  {
+    te = new TestStringTimeExtractor();
+    simpleAddGetAndSetTestHelper(testMeta.timeStore);
   }
 
   @Test
-  public void simpleAddGetAndSetManagedStateTest1()
+  public void simpleAddGetAndSetManagedStateTest()
   {
-    simpleAddGetAndSetTest1Helper(testMeta.store);
+    simpleAddGetAndSetTestHelper(testMeta.store);
   }
 
-  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+  public void simpleAddGetAndSetTestHelper(SpillableStateStore store)
   {
-    SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde());
+    SpillableSetImpl<String> set;
 
+    if (te == null) {
+      set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde());
+    } else {
+      set = new SpillableSetImpl<>(ID1, store, new StringSerde(), te);
+    }
     store.setup(testMeta.operatorContext);
     set.setup(testMeta.operatorContext);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
index 2f80628..bc1783c 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.apex.malhar.lib.utils.serde.StringSerde;
@@ -46,6 +47,8 @@ public class SpillableSetMultimapImplTest
   @Rule
   public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
 
+  public TimeExtractor<String> te = null;
+
   @Test
   public void simpleMultiKeyTest()
   {
@@ -60,10 +63,22 @@ public class SpillableSetMultimapImplTest
     simpleMultiKeyTestHelper(testMeta.store);
   }
 
+  @Test
+  public void simpleMultiKeyTimeUnifiedManagedStateTest()
+  {
+    te = new TestStringTimeExtractor();
+    simpleMultiKeyTestHelper(testMeta.timeStore);
+  }
+
+
   public void simpleMultiKeyTestHelper(SpillableStateStore store)
   {
-    SpillableSetMultimapImpl<String, String> map =
-        new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
+    SpillableSetMultimapImpl<String, String> map = null;
+    if (te == null) {
+      map = new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
+    } else {
+      map = new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde(), te);
+    }
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -296,7 +311,7 @@ public class SpillableSetMultimapImplTest
     store.endWindow();
   }
 
-  protected Serde<String> createStringSerde()
+  private Serde<String> createStringSerde()
   {
     return new StringSerde();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
index d72b1f9..a312f04 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -27,6 +27,7 @@ import org.junit.runner.Description;
 
 import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
 import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore;
 import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
@@ -57,6 +58,7 @@ public class SpillableTestUtils
   public static class TestMeta extends TestWatcher
   {
     public ManagedStateSpillableStateStore store;
+    public ManagedTimeUnifiedStateSpillableStateStore timeStore;
     public Context.OperatorContext operatorContext;
     public String applicationPath;
 
@@ -65,8 +67,10 @@ public class SpillableTestUtils
     {
       TestUtils.deleteTargetTestClassFolder(description);
       store = new ManagedStateSpillableStateStore();
+      timeStore = new ManagedTimeUnifiedStateSpillableStateStore();
       applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
       ((FileAccessFSImpl)store.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+      ((FileAccessFSImpl)timeStore.getFileAccess()).setBasePath(applicationPath + "/" + "time_bucket_data");
 
       operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java
new file mode 100644
index 0000000..438555f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
+
+/**
+ * A TimeExtractor for Tests
+ * Get the time value from ASCII code of the first character
+ */
+public class TestStringTimeExtractor implements TimeExtractor<String>
+{
+  static long BASETIME = System.currentTimeMillis();
+  @Override
+  public long getTime(String s)
+  {
+    return s.toCharArray()[0] * 1000 + BASETIME - 7200000;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
index a44e454..afc5227 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
@@ -38,14 +38,16 @@ public class SpillableWindowedStorageTest
   @Rule
   public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
 
+  public static long BASETIME = System.currentTimeMillis();
+
   @Test
   public void testWindowedPlainStorage()
   {
-    SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+    SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
     SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
-    Window window1 = new Window.TimeWindow<>(1000, 10);
-    Window window2 = new Window.TimeWindow<>(1010, 10);
-    Window window3 = new Window.TimeWindow<>(1020, 10);
+    Window window1 = new Window.TimeWindow<>(BASETIME + 1000, 10);
+    Window window2 = new Window.TimeWindow<>(BASETIME + 1010, 10);
+    Window window3 = new Window.TimeWindow<>(BASETIME + 1020, 10);
     storage.setSpillableComplexComponent(sccImpl);
 
     /*
@@ -103,11 +105,11 @@ public class SpillableWindowedStorageTest
   @Test
   public void testWindowedKeyedStorage()
   {
-    SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+    SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
     SpillableWindowedKeyedStorage<String, Integer> storage = new SpillableWindowedKeyedStorage<>();
-    Window window1 = new Window.TimeWindow<>(1000, 10);
-    Window window2 = new Window.TimeWindow<>(1010, 10);
-    Window window3 = new Window.TimeWindow<>(1020, 10);
+    Window window1 = new Window.TimeWindow<>(BASETIME + 1000, 10);
+    Window window2 = new Window.TimeWindow<>(BASETIME + 1010, 10);
+    Window window3 = new Window.TimeWindow<>(BASETIME + 1020, 10);
     storage.setSpillableComplexComponent(sccImpl);
 
     /*
@@ -118,7 +120,6 @@ public class SpillableWindowedStorageTest
     storage.setup(testMeta.operatorContext);
     storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
 
-
     sccImpl.beginWindow(1000);
     storage.put(window1, "x", 1);
     storage.put(window2, "x", 2);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 4a1cef0..f898e2d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -57,6 +57,9 @@ import com.datatorrent.lib.util.KeyValPair;
 @RunWith(Parameterized.class)
 public class WindowedOperatorTest
 {
+
+  public static final long BASE = (System.currentTimeMillis() / 1000) * 1000;
+
   @Parameterized.Parameters
   public static Collection<Object[]> testParameters()
   {
@@ -90,7 +93,7 @@ public class WindowedOperatorTest
   {
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
     if (useSpillable) {
-      sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+      sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
       // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys.
       windowStateStorage = new InMemoryWindowedStorage<>();
       SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>();
@@ -116,7 +119,7 @@ public class WindowedOperatorTest
   {
     KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>();
     if (useSpillable) {
-      sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+      sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
       // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys.
       windowStateStorage = new InMemoryWindowedStorage<>();
       if (forSession) {
@@ -183,7 +186,7 @@ public class WindowedOperatorTest
 
     windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
     Assert.assertEquals("There should be exactly one window in the storage", 1, plainDataStorage.size());
     Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size());
 
@@ -192,23 +195,22 @@ public class WindowedOperatorTest
     WindowState windowState = entry.getValue();
     Assert.assertEquals(-1, windowState.watermarkArrivalTime);
     Assert.assertEquals(2L, plainDataStorage.get(window).longValue());
-
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L));
     Assert.assertEquals(5L, plainDataStorage.get(window).longValue());
 
-    windowedOperator.processWatermark(new WatermarkImpl(1200));
+    windowedOperator.processWatermark(new WatermarkImpl(BASE + 1200));
     windowedOperator.endWindow();
     Assert.assertTrue(windowState.watermarkArrivalTime >= 0);
     Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false));
 
     windowedOperator.beginWindow(2);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L));
     Assert.assertEquals("Late but not too late", 9L, plainDataStorage.get(window).longValue());
-    windowedOperator.processWatermark(new WatermarkImpl(3000));
+    windowedOperator.processWatermark(new WatermarkImpl(BASE + 3000));
     windowedOperator.endWindow();
     Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false));
     windowedOperator.beginWindow(3);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // this tuple should be dropped
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 120L, 5L)); // this tuple should be dropped
     Assert.assertEquals("The window should be dropped because it's too late", 0, plainDataStorage.size());
     Assert.assertEquals("The window should be dropped because it's too late", 0, windowStateStorage.size());
     windowedOperator.endWindow();
@@ -238,8 +240,8 @@ public class WindowedOperatorTest
     windowedOperator.output.setSink(sink);
     windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L));
     windowedOperator.endWindow();
     Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
     windowedOperator.beginWindow(2);
@@ -251,11 +253,11 @@ public class WindowedOperatorTest
     Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
     sink.collectedTuples.clear();
     windowedOperator.beginWindow(4);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, 4L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, 4L));
     windowedOperator.endWindow();
     Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
     windowedOperator.beginWindow(5);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, 5L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, 5L));
     windowedOperator.endWindow();
     switch (accumulationMode) {
       case ACCUMULATING:
@@ -337,8 +339,8 @@ public class WindowedOperatorTest
     windowedOperator.output.setSink(sink);
     windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L));
     windowedOperator.endWindow();
     Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
     windowedOperator.beginWindow(2);
@@ -376,7 +378,7 @@ public class WindowedOperatorTest
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
     windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
     windowedOperator.setup(testMeta.operatorContext);
-    Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
+    Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L));
     Collection<? extends Window> windows = windowedValue.getWindows();
     Assert.assertEquals(1, windows.size());
     Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
@@ -389,11 +391,11 @@ public class WindowedOperatorTest
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
     windowedOperator.setup(testMeta.operatorContext);
-    Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
+    Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L));
     Collection<? extends Window> windows = windowedValue.getWindows();
     Assert.assertEquals(1, windows.size());
     Window window = windows.iterator().next();
-    Assert.assertEquals(1000, window.getBeginTimestamp());
+    Assert.assertEquals(BASE + 1000, window.getBeginTimestamp());
     Assert.assertEquals(1000, window.getDurationMillis());
   }
 
@@ -403,19 +405,19 @@ public class WindowedOperatorTest
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
     windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
     windowedOperator.setup(testMeta.operatorContext);
-    Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L));
+    Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1600L, 2L));
     Collection<? extends Window> windows = windowedValue.getWindows();
     Window[] winArray = windows.toArray(new Window[]{});
     Assert.assertEquals(5, winArray.length);
-    Assert.assertEquals(800, winArray[0].getBeginTimestamp());
+    Assert.assertEquals(BASE + 800, winArray[0].getBeginTimestamp());
     Assert.assertEquals(1000, winArray[0].getDurationMillis());
-    Assert.assertEquals(1000, winArray[1].getBeginTimestamp());
+    Assert.assertEquals(BASE + 1000, winArray[1].getBeginTimestamp());
     Assert.assertEquals(1000, winArray[1].getDurationMillis());
-    Assert.assertEquals(1200, winArray[2].getBeginTimestamp());
+    Assert.assertEquals(BASE + 1200, winArray[2].getBeginTimestamp());
     Assert.assertEquals(1000, winArray[2].getDurationMillis());
-    Assert.assertEquals(1400, winArray[3].getBeginTimestamp());
+    Assert.assertEquals(BASE + 1400, winArray[3].getBeginTimestamp());
     Assert.assertEquals(1000, winArray[3].getDurationMillis());
-    Assert.assertEquals(1600, winArray[4].getBeginTimestamp());
+    Assert.assertEquals(BASE + 1600, winArray[4].getBeginTimestamp());
     Assert.assertEquals(1000, winArray[4].getDurationMillis());
     windowedOperator.teardown();
   }
@@ -430,14 +432,14 @@ public class WindowedOperatorTest
     windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
     windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
-    Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L));
+    Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(BASE + 1100L, new KeyValPair<>("a", 2L));
     windowedOperator.processTuple(tuple);
 
     Assert.assertEquals(1, sink.getCount(false));
     Tuple.WindowedTuple<KeyValPair<String, Long>> out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
     Assert.assertEquals(1, out.getWindows().size());
     Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
-    Assert.assertEquals(1100L, window1.getBeginTimestamp());
+    Assert.assertEquals(BASE + 1100L, window1.getBeginTimestamp());
     Assert.assertEquals(2000, window1.getDurationMillis());
     Assert.assertEquals("a", window1.getKey());
     Assert.assertEquals("a", out.getValue().getKey());
@@ -445,7 +447,7 @@ public class WindowedOperatorTest
     sink.clear();
 
     // extending an existing session window
-    tuple = new Tuple.TimestampedTuple<>(2000L, new KeyValPair<>("a", 3L));
+    tuple = new Tuple.TimestampedTuple<>(BASE + 2000L, new KeyValPair<>("a", 3L));
     windowedOperator.processTuple(tuple);
     Assert.assertEquals(2, sink.getCount(false));
 
@@ -460,27 +462,27 @@ public class WindowedOperatorTest
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
     Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
 
-    Assert.assertEquals(1100L, window2.getBeginTimestamp());
+    Assert.assertEquals(BASE + 1100L, window2.getBeginTimestamp());
     Assert.assertEquals(2900, window2.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(5L, out.getValue().getValue().longValue());
     sink.clear();
 
     // a separate session window
-    tuple = new Tuple.TimestampedTuple<>(5000L, new KeyValPair<>("a", 4L));
+    tuple = new Tuple.TimestampedTuple<>(BASE + 5000L, new KeyValPair<>("a", 4L));
     windowedOperator.processTuple(tuple);
     Assert.assertEquals(1, sink.getCount(false));
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
     Assert.assertEquals(1, out.getWindows().size());
     Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
-    Assert.assertEquals(5000L, window3.getBeginTimestamp());
+    Assert.assertEquals(BASE + 5000L, window3.getBeginTimestamp());
     Assert.assertEquals(2000, window3.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(4L, out.getValue().getValue().longValue());
     sink.clear();
 
     // session window merging
-    tuple = new Tuple.TimestampedTuple<>(3500L, new KeyValPair<>("a", 3L));
+    tuple = new Tuple.TimestampedTuple<>(BASE + 3500L, new KeyValPair<>("a", 3L));
     windowedOperator.processTuple(tuple);
 
     Assert.assertEquals(3, sink.getCount(false));
@@ -509,7 +511,7 @@ public class WindowedOperatorTest
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2);
     Assert.assertEquals(1, out.getWindows().size());
     Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
-    Assert.assertEquals(1100L, window4.getBeginTimestamp());
+    Assert.assertEquals(BASE + 1100L, window4.getBeginTimestamp());
     Assert.assertEquals(5900, window4.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(12L, out.getValue().getValue().longValue());
@@ -525,14 +527,14 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
     windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("a", 3L)));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 4L)));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new KeyValPair<>("b", 5L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("a", 3L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 4L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 150L, new KeyValPair<>("b", 5L)));
     windowedOperator.endWindow();
     Assert.assertEquals(1, keyedDataStorage.size());
-    Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue());
-    Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue());
+    Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "a").longValue());
+    Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "b").longValue());
     windowedOperator.teardown();
   }
 
@@ -559,10 +561,10 @@ public class WindowedOperatorTest
     windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
     windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("b", 3L)));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("b", 5L)));
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("a", 4L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("b", 3L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, new KeyValPair<>("b", 5L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("a", 4L)));
     windowedOperator.endWindow();
     Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
     windowedOperator.beginWindow(2);
@@ -581,11 +583,11 @@ public class WindowedOperatorTest
     }
     sink.collectedTuples.clear();
     windowedOperator.beginWindow(4);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("a", 8L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, new KeyValPair<>("a", 8L)));
     windowedOperator.endWindow();
     Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
     windowedOperator.beginWindow(5);
-    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 9L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 9L)));
     windowedOperator.endWindow();
     Map<String, Long> map = new HashMap<>();
     switch (accumulationMode) {


Mime
View raw message