apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [3/3] apex-malhar git commit: APEXMALHAR-2190 Use reusable buffer for serialization in spillable data structures closes #404
Date Mon, 24 Oct 2016 20:46:37 GMT
APEXMALHAR-2190 Use reusable buffer for serialization in spillable data structures closes #404


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6ddefd02
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6ddefd02
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6ddefd02

Branch: refs/heads/master
Commit: 6ddefd02ac72168b17c3dedb47af7d91c23c3574
Parents: 3799157
Author: brightchen <bright@datatorrent.com>
Authored: Mon Aug 15 17:46:27 2016 -0700
Committer: Thomas Weise <thw@apache.org>
Committed: Mon Oct 24 13:43:02 2016 -0700

----------------------------------------------------------------------
 .../spillable/SpillableBenchmarkApp.java        |  69 +++++
 .../spillable/SpillableTestInputOperator.java   |  46 ++++
 .../spillable/SpillableTestOperator.java        | 189 ++++++++++++++
 .../spillable/SpillableBenchmarkAppTester.java  |  73 ++++++
 .../spillable/SpillableDSBenchmarkTest.java     | 171 +++++++++++++
 .../state/ManagedStateBenchmarkAppTest.java     | 101 ++++++++
 .../state/ManagedStateBenchmarkAppTester.java   | 101 --------
 benchmark/src/test/resources/log4j.properties   |   2 +
 .../state/managed/AbstractManagedStateImpl.java |  34 ++-
 .../apex/malhar/lib/state/managed/Bucket.java   |  85 +++++--
 .../lib/state/managed/BucketProvider.java       |  40 +++
 .../state/spillable/SpillableArrayListImpl.java |  17 +-
 .../SpillableArrayListMultimapImpl.java         |  53 ++--
 .../spillable/SpillableComplexComponent.java    |  29 +--
 .../SpillableComplexComponentImpl.java          |  64 +++--
 .../lib/state/spillable/SpillableMapImpl.java   |  44 ++--
 .../lib/state/spillable/SpillableSetImpl.java   |  45 +---
 .../spillable/SpillableSetMultimapImpl.java     |  45 ++--
 .../state/spillable/SpillableStateStore.java    |   3 +-
 .../state/spillable/WindowBoundedMapCache.java  |   5 +-
 .../inmem/InMemSpillableStateStore.java         |  26 ++
 .../utils/serde/AffixKeyValueSerdeManager.java  |  76 ++++++
 .../apex/malhar/lib/utils/serde/AffixSerde.java |  68 +++++
 .../apex/malhar/lib/utils/serde/ArraySerde.java |  97 ++++++++
 .../apex/malhar/lib/utils/serde/Block.java      | 217 ++++++++++++++++
 .../lib/utils/serde/BlockReleaseStrategy.java   |  47 ++++
 .../malhar/lib/utils/serde/BlockStream.java     | 179 +++++++++++++
 .../malhar/lib/utils/serde/BufferSlice.java     | 100 ++++++++
 .../malhar/lib/utils/serde/CollectionSerde.java |  97 ++++++++
 .../serde/DefaultBlockReleaseStrategy.java      |  96 +++++++
 .../malhar/lib/utils/serde/GenericSerde.java    |  81 ++++++
 .../apex/malhar/lib/utils/serde/IntSerde.java   |  45 ++++
 .../utils/serde/KeyValueByteStreamProvider.java |  37 +++
 .../lib/utils/serde/KeyValueSerdeManager.java   |  86 +++++++
 .../apex/malhar/lib/utils/serde/LongSerde.java  |  45 ++++
 .../apex/malhar/lib/utils/serde/PairSerde.java  |  73 ++++++
 .../lib/utils/serde/PassThruByteArraySerde.java |  51 ----
 .../serde/PassThruByteArraySliceSerde.java      |  61 -----
 .../lib/utils/serde/PassThruSliceSerde.java     |  32 ++-
 .../apex/malhar/lib/utils/serde/Serde.java      |  41 +--
 .../lib/utils/serde/SerdeCollectionSlice.java   | 120 ---------
 .../malhar/lib/utils/serde/SerdeIntSlice.java   |  54 ----
 .../malhar/lib/utils/serde/SerdeKryoSlice.java  | 100 --------
 .../malhar/lib/utils/serde/SerdeLongSlice.java  |  54 ----
 .../malhar/lib/utils/serde/SerdePairSlice.java  |  89 -------
 .../lib/utils/serde/SerdeStringSlice.java       |  55 ----
 .../lib/utils/serde/SerializationBuffer.java    | 130 ++++++++++
 .../apex/malhar/lib/utils/serde/SliceUtils.java |  10 +
 .../malhar/lib/utils/serde/StringSerde.java     |  45 ++++
 .../lib/utils/serde/WindowCompleteListener.java |  29 +++
 .../lib/utils/serde/WindowedBlockStream.java    | 249 +++++++++++++++++++
 .../impl/SpillableSessionWindowedStorage.java   |   3 +-
 .../impl/SpillableWindowedKeyedStorage.java     |  28 +--
 .../impl/SpillableWindowedPlainStorage.java     |  18 +-
 .../com/datatorrent/lib/util/TestUtils.java     |   3 +-
 .../lib/state/managed/DefaultBucketTest.java    |  48 +++-
 .../state/managed/ManagedStateTestUtils.java    |   3 +-
 .../spillable/SpillableArrayListImplTest.java   |  12 +-
 .../SpillableArrayListMultimapImplTest.java     |  30 ++-
 .../SpillableComplexComponentImplTest.java      |   6 +-
 .../state/spillable/SpillableMapImplTest.java   |  39 ++-
 .../state/spillable/SpillableSetImplTest.java   |   4 +-
 .../spillable/SpillableSetMultimapImplTest.java |  18 +-
 .../lib/state/spillable/SpillableTestUtils.java |  46 ++--
 .../spillable/TimeBasedPriorityQueueTest.java   |   3 -
 .../malhar/lib/utils/serde/AffixSerdeTest.java  |  43 ++++
 .../malhar/lib/utils/serde/BlockStreamTest.java | 179 +++++++++++++
 .../lib/utils/serde/CollectionSerdeTest.java    |  68 +++++
 .../lib/utils/serde/GenericSerdeTest.java       |  84 +++++++
 .../malhar/lib/utils/serde/PairSerdeTest.java   |  48 ++++
 .../utils/serde/PassThruByteArraySerdeTest.java |  72 ------
 .../utils/serde/SerdeCollectionSliceTest.java   |  65 -----
 .../lib/utils/serde/SerdeGeneralTest.java       | 169 +++++++++++++
 .../lib/utils/serde/SerdeKryoSliceTest.java     |  79 ------
 .../lib/utils/serde/SerdePairSliceTest.java     |  44 ----
 .../window/SpillableWindowedStorageTest.java    |  17 +-
 76 files changed, 3570 insertions(+), 1265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
new file mode 100644
index 0000000..e2fe8bb
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+@ApplicationAnnotation(name = "SpillableBenchmarkApp")
+public class SpillableBenchmarkApp implements StreamingApplication
+{
+  protected final String PROP_STORE_PATH = "dt.application.SpillableBenchmarkApp.storeBasePath";
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Create ActiveMQStringSinglePortOutputOperator
+    SpillableTestInputOperator input = new SpillableTestInputOperator();
+    input.batchSize = 100;
+    input.sleepBetweenBatch = 0;
+    input = dag.addOperator("input", input);
+
+    SpillableTestOperator testOperator = new SpillableTestOperator();
+    testOperator.store = createStore(conf);
+    testOperator.shutdownCount = -1;
+    testOperator = dag.addOperator("test", testOperator );
+
+
+    // Connect ports
+    dag.addStream("stream", input.output, testOperator.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+  }
+
+
+  public ManagedStateSpillableStateStore createStore(Configuration conf)
+  {
+    String basePath = getStoreBasePath(conf);
+    ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+    ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+    return store;
+  }
+
+  public String getStoreBasePath(Configuration conf)
+  {
+    return Preconditions.checkNotNull(conf.get(PROP_STORE_PATH),
+        "base path should be specified in the properties.xml");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
new file mode 100644
index 0000000..2e33721
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestInputOperator extends BaseOperator implements InputOperator
+{
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+  public long count = 0;
+  public int batchSize = 100;
+  public int sleepBetweenBatch = 1;
+
+  @Override
+  public void emitTuples()
+  {
+    for (int i = 0; i < batchSize; ++i) {
+      output.emit("" + ++count);
+    }
+    if (sleepBetweenBatch > 0) {
+      try {
+        Thread.sleep(sleepBetweenBatch);
+      } catch (InterruptedException e) {
+        //ignore
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
new file mode 100644
index 0000000..3c5bf71
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.LongSerde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ShutdownException;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener
+{
+  private static final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class);
+
+  public static final byte[] ID1 = new byte[] {(byte)1};
+  public static final byte[] ID2 = new byte[] {(byte)2};
+  public static final byte[] ID3 = new byte[] {(byte)3};
+
+  public SpillableArrayListMultimapImpl<String, String> multiMap;
+
+  public ManagedStateSpillableStateStore store;
+
+  public long totalCount = 0;
+  public transient long countInWindow;
+  public long minWinId = -1;
+  public long committedWinId = -1;
+  public long windowId;
+
+  public SpillableMapImpl<Long, Long> windowToCount;
+
+  public long shutdownCount = -1;
+
+  public static Throwable errorTrace;
+
+  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  public void processTuple(String tuple)
+  {
+    if (++totalCount == shutdownCount) {
+      throw new RuntimeException("Test recovery. count = " + totalCount);
+    }
+    countInWindow++;
+    multiMap.put("" + windowId, tuple);
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    if (windowToCount == null) {
+      windowToCount = createWindowToCountMap(store);
+    }
+    if (multiMap == null) {
+      multiMap = createMultimap(store);
+    }
+
+    store.setup(context);
+    multiMap.setup(context);
+
+    checkData();
+  }
+
+  public void checkData()
+  {
+    long startTime = System.currentTimeMillis();
+    logger.debug("check data: totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount,
+        this.minWinId, committedWinId, this.windowId);
+    for (long winId = Math.max(committedWinId + 1, minWinId); winId < this.windowId; ++winId) {
+      Long count = this.windowToCount.get(winId);
+      SpillableArrayListImpl<String> datas = (SpillableArrayListImpl<String>)multiMap.get("" + winId);
+      String msg;
+      if (((datas == null && count != null) || (datas != null && count == null)) || (datas == null && count == null)) {
+        msg = "Invalid data/count. datas: " + datas + "; count: " + count;
+        logger.error(msg);
+        errorTrace = new RuntimeException(msg);
+        throw new ShutdownException();
+      } else {
+        int dataSize = datas.size();
+        if ((long)count != (long)dataSize) {
+          msg = String.format("data size not equal: window Id: %d; datas size: %d; count: %d", winId, dataSize, count);
+          logger.error(msg);
+          errorTrace = new RuntimeException(msg);
+          throw new ShutdownException();
+        }
+      }
+    }
+    logger.info("check data took {} millis.", System.currentTimeMillis() - startTime);
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    store.beginWindow(windowId);
+    multiMap.beginWindow(windowId);
+    if (minWinId < 0) {
+      minWinId = windowId;
+    }
+
+    this.windowId = windowId;
+    countInWindow = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    multiMap.endWindow();
+    windowToCount.put(windowId, countInWindow);
+    windowToCount.endWindow();
+    store.endWindow();
+
+    if (windowId % 10 == 0) {
+      checkData();
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    store.beforeCheckpoint(windowId);
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    this.committedWinId = windowId;
+    store.committed(windowId);
+  }
+
+  public static SpillableArrayListMultimapImpl<String, String> createMultimap(SpillableStateStore store)
+  {
+    return new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+        new StringSerde());
+  }
+
+  public static SpillableMapImpl<String, String> createMap(SpillableStateStore store)
+  {
+    return new SpillableMapImpl<String, String>(store, ID2, 0L, new StringSerde(),
+        new StringSerde());
+  }
+
+  public static SpillableMapImpl<Long, Long> createWindowToCountMap(SpillableStateStore store)
+  {
+    return new SpillableMapImpl<Long, Long>(store, ID3, 0L, new LongSerde(),
+        new LongSerde());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
new file mode 100644
index 0000000..7f94079
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
+{
+  private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class);
+  public static final String basePath = "target/temp";
+  @Test
+  public void test() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    super.populateDAG(dag, conf);
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(60000);
+
+    lc.shutdown();
+
+    if (SpillableTestOperator.errorTrace != null) {
+      logger.error("Error.", SpillableTestOperator.errorTrace);
+      Assert.assertNull(SpillableTestOperator.errorTrace.getMessage(), SpillableTestOperator.errorTrace);
+    }
+  }
+
+  @Override
+  public String getStoreBasePath(Configuration conf)
+  {
+    return basePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
new file mode 100644
index 0000000..7e64c5f
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+
+public class SpillableDSBenchmarkTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class);
+  protected static final int loopCount = 100000000;
+  protected static final long oneMB = 1024 * 1024;
+  protected static final int keySize = 500000;
+  protected static final int valueSize = 100000;
+  protected static final int maxKeyLength = 100;
+  protected static final int maxValueLength = 1000;
+
+  protected static final int tuplesPerWindow = 10000;
+  protected static final int checkPointWindows = 10;
+  protected static final int commitDelays = 100;
+
+  protected final transient Random random = new Random();
+  protected String[] keys;
+  protected String[] values;
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+
+  @Before
+  public void setup()
+  {
+    keys = new String[keySize];
+    for (int i = 0; i < keys.length; ++i) {
+      keys[i] = this.randomString(maxKeyLength);
+    }
+
+    values = new String[valueSize];
+    for (int i = 0; i < values.length; ++i) {
+      values[i] = this.randomString(maxValueLength);
+    }
+  }
+
+  @Test
+  public void testSpillableMap()
+  {
+    byte[] ID1 = new byte[]{(byte)1};
+    ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+    ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp");
+
+    StringSerde keySerde = createKeySerde();
+    Serde<String> valueSerde = createValueSerde();
+
+    SpillableMapImpl<String, String> map = new SpillableMapImpl<String, String>(store, ID1, 0L, keySerde, valueSerde);
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    final long startTime = System.currentTimeMillis();
+
+    long windowId = 0;
+    store.beginWindow(++windowId);
+    map.beginWindow(windowId);
+
+    int outputTimes = 0;
+    for (int i = 0; i < loopCount; ++i) {
+      putEntry(map);
+
+      if (i % tuplesPerWindow == 0) {
+        map.endWindow();
+        store.endWindow();
+
+        if (i % (tuplesPerWindow * checkPointWindows) == 0) {
+          store.beforeCheckpoint(windowId);
+
+          if (windowId > commitDelays) {
+            store.committed(windowId - commitDelays);
+          }
+        }
+
+        //next window
+        store.beginWindow(++windowId);
+        map.beginWindow(windowId);
+      }
+
+      long spentTime = System.currentTimeMillis() - startTime;
+      if (spentTime > outputTimes * 5000) {
+        ++outputTimes;
+        logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime);
+        checkEnvironment();
+      }
+    }
+    long spentTime = System.currentTimeMillis() - startTime;
+
+    logger.info("Spent {} mills for {} operation. average: {}", spentTime, loopCount,
+        loopCount / spentTime);
+  }
+
+
+  public void putEntry(SpillableMapImpl<String, String> map)
+  {
+    map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]);
+  }
+
+  public static final String characters = "0123456789ABCDEFGHIJKLMNOPKRSTUVWXYZabcdefghijklmopqrstuvwxyz";
+
+  protected static final char[] text = new char[Math.max(maxKeyLength, maxValueLength)];
+
+  public String randomString(int length)
+  {
+    for (int i = 0; i < length; i++) {
+      text[i] = characters.charAt(random.nextInt(characters.length()));
+    }
+    return new String(text, 0, length);
+  }
+
+  public void checkEnvironment()
+  {
+    Runtime runtime = Runtime.getRuntime();
+
+    long maxMemory = runtime.maxMemory() / oneMB;
+    long allocatedMemory = runtime.totalMemory() / oneMB;
+    long freeMemory = runtime.freeMemory() / oneMB;
+
+    logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory,
+        allocatedMemory, maxMemory);
+
+    Assert.assertFalse("Run out of memory.", allocatedMemory == maxMemory && freeMemory < 10);
+  }
+
+  protected StringSerde createKeySerde()
+  {
+    return new StringSerde();
+  }
+
+  protected Serde<String> createValueSerde()
+  {
+    return new StringSerde();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
new file mode 100644
index 0000000..4792843
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
+
+/**
+ * This is not a really unit test, but in fact a benchmark runner.
+ * Provides this class to give developers the convenience to run in local IDE environment.
+ *
+ */
+public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
+{
+  public static final String basePath = "target/temp";
+
+  @Before
+  public void before()
+  {
+    FileUtil.fullyDelete(new File(basePath));
+  }
+
+  @Test
+  public void testUpdateSync() throws Exception
+  {
+    test(ExecMode.UPDATESYNC);
+  }
+
+  @Test
+  public void testUpdateAsync() throws Exception
+  {
+    test(ExecMode.UPDATEASYNC);
+  }
+
+  @Test
+  public void testInsert() throws Exception
+  {
+    test(ExecMode.INSERT);
+  }
+
+  public void test(ExecMode exeMode) throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    super.populateDAG(dag, conf);
+    storeOperator.execMode = exeMode;
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(300000);
+
+    lc.shutdown();
+  }
+
+
+
+  @Override
+  public String getStoreBasePath(Configuration conf)
+  {
+    return basePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
deleted file mode 100644
index 4435aad..0000000
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.benchmark.state;
-
-import java.io.File;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
-
-/**
- * This is not a really unit test, but in fact a benchmark runner.
- * Provides this class to give developers the convenience to run in local IDE environment.
- *
- */
-public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
-{
-  public static final String basePath = "target/temp";
-
-  @Before
-  public void before()
-  {
-    FileUtil.fullyDelete(new File(basePath));
-  }
-
-  @Test
-  public void testUpdateSync() throws Exception
-  {
-    test(ExecMode.UPDATESYNC);
-  }
-
-  @Test
-  public void testUpdateAsync() throws Exception
-  {
-    test(ExecMode.UPDATEASYNC);
-  }
-
-  @Test
-  public void testInsert() throws Exception
-  {
-    test(ExecMode.INSERT);
-  }
-
-  public void test(ExecMode exeMode) throws Exception
-  {
-    Configuration conf = new Configuration(false);
-
-    LocalMode lma = LocalMode.newInstance();
-    DAG dag = lma.getDAG();
-
-    super.populateDAG(dag, conf);
-    storeOperator.execMode = exeMode;
-
-    StreamingApplication app = new StreamingApplication()
-    {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-      }
-    };
-
-    lma.prepareDAG(app, conf);
-
-    // Create local cluster
-    final LocalMode.Controller lc = lma.getController();
-    lc.run(300000);
-
-    lc.shutdown();
-  }
-
-
-
-  @Override
-  public String getStoreBasePath(Configuration conf)
-  {
-    return basePath;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index cf0d19e..3fc0120 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -41,3 +41,5 @@ log4j.logger.org=info
 #log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=debug
 log4j.logger.org.apache.apex=debug
+log4j.logger.org.apache.apex.malhar.lib.state.managed=info
+log4j.logger.com.datatorrent.common.util.FSStorageAgent=info

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index dd2bbab..20271b0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -125,7 +125,7 @@ import com.datatorrent.netlet.util.Slice;
  */
 public abstract class AbstractManagedStateImpl
     implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext,
-    TimeBucketAssigner.PurgeListener
+    TimeBucketAssigner.PurgeListener, BucketProvider
 {
   private long maxMemorySize;
 
@@ -319,11 +319,24 @@ public abstract class AbstractManagedStateImpl
     return (int)(bucketId % numBuckets);
   }
 
-  Bucket getBucket(long bucketId)
+  @Override
+  public Bucket getBucket(long bucketId)
   {
     return buckets[getBucketIdx(bucketId)];
   }
 
+  @Override
+  public Bucket ensureBucket(long bucketId)
+  {
+    Bucket b = getBucket(bucketId);
+    if (b == null) {
+      b = newBucket(bucketId);
+      b.setup(this);
+      buckets[getBucketIdx(bucketId)] = b;
+    }
+    return b;
+  }
+
   protected Bucket newBucket(long bucketId)
   {
     return new Bucket.DefaultBucket(bucketId);
@@ -384,6 +397,22 @@ public abstract class AbstractManagedStateImpl
     }
   }
 
+  /**
+   * get the memory usage for each bucket
+   * @return The map of bucket id to memory size used by the bucket
+   */
+  public Map<Long, Long> getBucketMemoryUsage()
+  {
+    Map<Long, Long> bucketToSize = Maps.newHashMap();
+    for (Bucket bucket : buckets) {
+      if (bucket == null) {
+        continue;
+      }
+      bucketToSize.put(bucket.getBucketId(), bucket.getKeyStream().size() + bucket.getValueStream().size());
+    }
+    return bucketToSize;
+  }
+
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public void teardown()
@@ -476,6 +505,7 @@ public abstract class AbstractManagedStateImpl
     this.keyComparator = Preconditions.checkNotNull(keyComparator);
   }
 
+  @Override
   public BucketsFileSystem getBucketsFileSystem()
   {
     return bucketsFileSystem;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 4fc2327..cbc4e03 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -32,6 +33,10 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.serde.KeyValueByteStreamProvider;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -46,7 +51,7 @@ import com.datatorrent.netlet.util.Slice;
  *
  * @since 3.4.0
  */
-public interface Bucket extends ManagedStateComponent
+public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvider
 {
   /**
    * @return bucket id
@@ -218,13 +223,22 @@ public interface Bucket extends ManagedStateComponent
 
     private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
 
+    /**
+     * By default, separate keys and values into two different streams.
+     * key stream and value stream should be created during construction instead of setup, as the reference of the streams will be passed to the serialize method
+     */
+    protected WindowedBlockStream keyStream = new WindowedBlockStream();
+    protected WindowedBlockStream valueStream = new WindowedBlockStream();
+
+    protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>();
+
     private DefaultBucket()
     {
       //for kryo
       bucketId = -1;
     }
 
-    protected DefaultBucket(long bucketId)
+    public DefaultBucket(long bucketId)
     {
       this.bucketId = bucketId;
     }
@@ -321,6 +335,9 @@ public interface Bucket extends ManagedStateComponent
     @Override
     public Slice get(Slice key, long timeBucket, ReadSource readSource)
     {
+      // This call is lightweight
+      releaseMemory();
+      key = SliceUtils.toBufferSlice(key);
       switch (readSource) {
         case MEMORY:
           return getFromMemory(key);
@@ -392,6 +409,11 @@ public interface Bucket extends ManagedStateComponent
     @Override
     public void put(Slice key, long timeBucket, Slice value)
     {
+      // This call is lightweight
+      releaseMemory();
+      key = SliceUtils.toBufferSlice(key);
+      value = SliceUtils.toBufferSlice(value);
+
       BucketedValue bucketedValue = flash.get(key);
       if (bucketedValue == null) {
         bucketedValue = new BucketedValue(timeBucket, value);
@@ -409,39 +431,45 @@ public interface Bucket extends ManagedStateComponent
       }
     }
 
+    /**
+     * Free memory up to the given windowId
+     * This method will be called by another thread. Adding concurrency control to Stream would impact the performance.
+     * This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread
+     */
     @Override
     public long freeMemory(long windowId) throws IOException
     {
-      long memoryFreed = 0;
-      Long clearWindowId;
-
-      while ((clearWindowId = committedData.floorKey(windowId)) != null) {
-        Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
+      // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance.
+      long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId);
+      windowsForFreeMemory.add(windowId);
+      return size;
+    }
 
-        for (Map.Entry<Slice, BucketedValue> entry: windowData.entrySet()) {
-          memoryFreed += entry.getKey().length + entry.getValue().getSize();
-        }
+    /**
+     * This operation must be called from operator thread. It won't do anything if no memory to be freed
+     */
+    protected long releaseMemory()
+    {
+      long memoryFreed = 0;
+      while (!windowsForFreeMemory.isEmpty()) {
+        long windowId = windowsForFreeMemory.poll();
+        long originSize = keyStream.size() + valueStream.size();
+        keyStream.completeWindow(windowId);
+        valueStream.completeWindow(windowId);
+        memoryFreed += originSize - (keyStream.size() + valueStream.size());
       }
-      fileCache.clear();
-      if (cachedBucketMetas != null) {
-
-        for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
-          FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
-          if (reader != null) {
-            memoryFreed += tbm.getSizeInBytes();
-            reader.close();
-          }
-        }
 
+      if (memoryFreed > 0) {
+        LOG.debug("Total freed memory size: {}", memoryFreed);
+        sizeInBytes.getAndAdd(-memoryFreed);
       }
-      sizeInBytes.getAndAdd(-memoryFreed);
-      LOG.debug("space freed {} {}", bucketId, memoryFreed);
       return memoryFreed;
     }
 
     @Override
     public Map<Slice, BucketedValue> checkpoint(long windowId)
     {
+      releaseMemory();
       try {
         //transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
         return flash;
@@ -548,6 +576,19 @@ public interface Bucket extends ManagedStateComponent
       return checkpointedData;
     }
 
+
+    @Override
+    public WindowedBlockStream getKeyStream()
+    {
+      return keyStream;
+    }
+
+    @Override
+    public WindowedBlockStream getValueStream()
+    {
+      return valueStream;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
new file mode 100644
index 0000000..bbd18ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * This interface declares methods to get bucket by bucket id
+ *
+ */
+public interface BucketProvider
+{
+  /**
+   * get bucket by bucket id
+   * @param bucketId
+   * @return
+   */
+  public Bucket getBucket(long bucketId);
+
+  /**
+   * Create bucket if not exist, return the bucket
+   * @param bucketId
+   * @return
+   */
+  public Bucket ensureBucket(long bucketId);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index a59872c..d0ca9ff 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -26,9 +26,9 @@ import java.util.ListIterator;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
@@ -37,7 +37,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -58,11 +57,10 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
   @NotNull
   private SpillableStateStore store;
   @NotNull
-  private Serde<T, Slice> serde;
+  private Serde<T> serde;
   @NotNull
   private SpillableMapImpl<Integer, List<T>> map;
 
-  private boolean sizeCached = false;
   private int size;
   private int numBatches;
 
@@ -86,15 +84,15 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
    */
   public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
       @NotNull SpillableStateStore store,
-      @NotNull Serde<T, Slice> serde)
+      @NotNull Serde<T> serde)
   {
     this.bucketId = bucketId;
     this.prefix = Preconditions.checkNotNull(prefix);
     this.store = Preconditions.checkNotNull(store);
     this.serde = Preconditions.checkNotNull(serde);
 
-    map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
-        new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class));
+    map = new SpillableMapImpl<>(store, prefix, bucketId, new IntSerde(),
+        new CollectionSerde<T, List<T>>(serde, (Class)ArrayList.class));
   }
 
   /**
@@ -111,7 +109,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
    */
   public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
       @NotNull SpillableStateStore store,
-      @NotNull Serde<T, Slice> serde,
+      @NotNull Serde<T> serde,
       int batchSize)
   {
     this(bucketId, prefix, store, serde);
@@ -328,6 +326,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
   @Override
   public void setup(Context.OperatorContext context)
   {
+    store.ensureBucket(bucketId);
     map.setup(context);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
index 0944583..d3340ce 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
@@ -26,10 +26,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
 import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
@@ -62,10 +62,11 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
   @NotNull
   private SpillableMapImpl<Slice, Integer> map;
   private SpillableStateStore store;
-  private byte[] identifier;
   private long bucket;
-  private Serde<K, Slice> serdeKey;
-  private Serde<V, Slice> serdeValue;
+  private Serde<V> valueSerde;
+
+  protected transient Context.OperatorContext context;
+  protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
 
   private SpillableArrayListMultimapImpl()
   {
@@ -78,20 +79,20 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
    * @param identifier The Id of this {@link SpillableArrayListMultimapImpl}.
    * @param bucket The Id of the bucket used to store this
    * {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+   * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
    */
   public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+      Serde<K> keySerde,
+      Serde<V> valueSerde)
   {
     this.store = Preconditions.checkNotNull(store);
-    this.identifier = Preconditions.checkNotNull(identifier);
     this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+    this.valueSerde = Preconditions.checkNotNull(valueSerde);
+
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(SIZE_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
 
-    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
+    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new IntSerde());
   }
 
   public SpillableStateStore getStore()
@@ -110,15 +111,12 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
     SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
 
     if (spillableArrayList == null) {
-      Slice keySlice = serdeKey.serialize(key);
-      Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
-
+      Integer size = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
       if (size == null) {
         return null;
       }
 
-      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
-      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, false).toByteArray(), store, valueSerde);
       spillableArrayList.setSize(size);
     }
 
@@ -179,8 +177,7 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
   @Override
   public boolean containsKey(@Nullable Object key)
   {
-    return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
-        SIZE_KEY_SUFFIX));
+    return cache.contains((K)key) || map.containsKey(keyValueSerdeManager.serializeMetaKey((K)key, false));
   }
 
   @Override
@@ -217,9 +214,9 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
     SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
 
     if (spillableArrayList == null) {
-      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
-      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
-
+      Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, true);
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+      spillableArrayList.setup(context);
       cache.put(key, spillableArrayList);
     }
 
@@ -272,14 +269,19 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
   @Override
   public void setup(Context.OperatorContext context)
   {
+    this.context = context;
+
     map.setup(context);
     isRunning = true;
+
+    keyValueSerdeManager.setup(store, bucket);
   }
 
   @Override
   public void beginWindow(long windowId)
   {
     map.beginWindow(windowId);
+    keyValueSerdeManager.beginWindow(windowId);
     isInWindow = true;
   }
 
@@ -292,13 +294,14 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
       SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
       spillableArrayList.endWindow();
 
-      Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
-          spillableArrayList.size());
+      map.put(keyValueSerdeManager.serializeMetaKey(key, true), spillableArrayList.size());
     }
 
     Preconditions.checkState(cache.getRemovedKeys().isEmpty());
     cache.endWindow();
     map.endWindow();
+
+    keyValueSerdeManager.resetReadBuffer();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index c4462d5..542a914 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -24,7 +24,6 @@ import org.apache.apex.malhar.lib.utils.serde.Serde;
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * This is a composite component containing spillable data structures. This should be used as
@@ -43,7 +42,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
    * @return A {@link SpillableList}.
    */
-  <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
+  <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableList}.
@@ -53,7 +52,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
    * @return A {@link SpillableList}.
    */
-  <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableMap}. This method
@@ -65,8 +64,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serdeValue The Serializer/Deserializer to use for the map's values.
    * @return A {@link SpillableMap}.
    */
-  <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue);
+  <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableMap}.
@@ -79,7 +78,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableMap}.
    */
   <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue);
+      Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableListMultimap}. This method
@@ -91,8 +90,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
    * @return A {@link SpillableListMultimap}.
    */
-  <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue);
+  <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableListMultimap}.
@@ -105,8 +103,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableListMultimap}.
    */
   <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue);
+      Serde<K> serdeKey,
+      Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableSetMultimap}.
@@ -117,8 +115,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
    * @return A {@link SpillableSetMultimap}.
    */
-  <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue);
+  <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableMultiset}. This method
@@ -128,7 +125,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
    * @return A {@link SpillableMultiset}.
    */
-  <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde);
+  <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableMultiset}.
@@ -138,7 +135,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
    * @return A {@link SpillableMultiset}.
    */
-  <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableQueue}. This method
@@ -148,7 +145,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */
-  <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde);
+  <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableQueue}.
@@ -158,5 +155,5 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */
-  <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index aad219d..1a3f550 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.lib.state.spillable;
 
 import java.util.List;
+import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
@@ -27,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an
@@ -50,6 +51,11 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   @NotNull
   private SpillableIdentifierGenerator identifierGenerator;
 
+  /**
+   * need to make sure all the buckets are created during setup.
+   */
+  protected transient Set<Long> bucketIds = Sets.newHashSet();
+
   private SpillableComplexComponentImpl()
   {
     // for kryo
@@ -66,84 +72,99 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
     this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
   }
 
-  public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde)
   {
     SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
     componentList.add(list);
     return list;
   }
 
-  public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde)
   {
     identifierGenerator.register(identifier);
     SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+    bucketIds.add(bucket);
     componentList.add(list);
     return list;
   }
 
-  public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue)
   {
     SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
         bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue)
   {
     identifierGenerator.register(identifier);
     SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
   {
     SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
         identifierGenerator.next(), bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
+  @Override
   public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+      Serde<K> serdeKey,
+      Serde<V> serdeValue)
   {
     identifierGenerator.register(identifier);
     SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
         identifier, bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
   {
     SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
         identifierGenerator.next(), bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
 
-  public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
 
-  public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
 
-  public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
@@ -152,6 +173,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   public void setup(Context.OperatorContext context)
   {
     store.setup(context);
+
+    //ensure buckets created.
+    for (long bucketId : bucketIds) {
+      store.ensureBucket(bucketId);
+    }
+
+    //the bucket ids are only for setup. We don't need bucket ids during run time.
+    bucketIds.clear();
+
     for (SpillableComponent spillableComponent: componentList) {
       spillableComponent.setup(context);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index 016aeec..5fa39d7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,13 +26,13 @@ import java.util.Set;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 import com.google.common.base.Preconditions;
 
@@ -51,21 +51,20 @@ import com.datatorrent.netlet.util.Slice;
 public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent,
     Serializable
 {
+  private static final long serialVersionUID = 4552547110215784584L;
   private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
-  private transient MutableInt tempOffset = new MutableInt();
+  private transient Input tmpInput = new Input();
 
   @NotNull
   private SpillableStateStore store;
   @NotNull
   private byte[] identifier;
   private long bucket;
-  @NotNull
-  private Serde<K, Slice> serdeKey;
-  @NotNull
-  private Serde<V, Slice> serdeValue;
 
   private int size = 0;
 
+  protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+
   private SpillableMapImpl()
   {
     //for kryo
@@ -77,17 +76,16 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
    * @param identifier The Id of this {@link SpillableMapImpl}.
    * @param bucket The Id of the bucket used to store this
    * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing values.
    */
-  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde,
+      Serde<V> valueSerde)
   {
     this.store = Preconditions.checkNotNull(store);
     this.identifier = Preconditions.checkNotNull(identifier);
     this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde));
   }
 
   public SpillableStateStore getStore()
@@ -134,16 +132,17 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
       return val;
     }
 
-    Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+    Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false));
 
     if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
       return null;
     }
 
-    tempOffset.setValue(0);
-    return serdeValue.deserialize(valSlice, tempOffset);
+    tmpInput.setBuffer(valSlice.buffer, valSlice.offset, valSlice.length);
+    return keyValueSerdeManager.deserializeValue(tmpInput);
   }
 
+
   @Override
   public V put(K k, V v)
   {
@@ -207,6 +206,8 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   @Override
   public void setup(Context.OperatorContext context)
   {
+    store.ensureBucket(bucket);
+    keyValueSerdeManager.setup(store, bucket);
   }
 
   @Override
@@ -218,16 +219,15 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   public void endWindow()
   {
     for (K key: cache.getChangedKeys()) {
-      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
-          serdeValue.serialize(cache.get(key)));
+      store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true),
+          keyValueSerdeManager.serializeValue(cache.get(key)));
     }
 
     for (K key: cache.getRemovedKeys()) {
-      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
-          new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+      store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
     }
-
     cache.endWindow();
+    keyValueSerdeManager.resetReadBuffer();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index c2741b0..0dfc411 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -26,15 +26,15 @@ import java.util.NoSuchElementException;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 import com.google.common.base.Preconditions;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -62,49 +62,30 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
     T next;
   }
 
-  public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice>
+  public static class ListNodeSerde<T> implements Serde<ListNode<T>>
   {
-    private Serde<T, Slice> serde;
-    private static Slice falseSlice = new Slice(new byte[]{0});
-    private static Slice trueSlice = new Slice(new byte[]{1});
+    private Serde<T> serde;
 
-    public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde)
+    public ListNodeSerde(@NotNull Serde<T> serde)
     {
       this.serde = Preconditions.checkNotNull(serde);
     }
 
     @Override
-    public Slice serialize(ListNode<T> object)
+    public void serialize(ListNode<T> object, Output output)
     {
-      int size = 0;
-
-      Slice slice1 = object.valid ? trueSlice : falseSlice;
-      size += 1;
-      Slice slice2 = serde.serialize(object.next);
-      size += slice2.length;
-
-      byte[] bytes = new byte[size];
-      System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
-      System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
-      return new Slice(bytes);
+      output.writeBoolean(object.valid);
+      serde.serialize(object.next, output);
     }
 
     @Override
-    public ListNode<T> deserialize(Slice slice, MutableInt offset)
+    public ListNode<T> deserialize(Input input)
     {
       ListNode<T> result = new ListNode<>();
-      result.valid = slice.buffer[offset.intValue()] != 0;
-      offset.add(1);
-      result.next = serde.deserialize(slice, offset);
+      result.valid = input.readBoolean();
+      result.next = serde.deserialize(input);
       return result;
     }
-
-    @Override
-    public ListNode<T> deserialize(Slice object)
-    {
-      return deserialize(object, new MutableInt(0));
-    }
   }
 
   @NotNull
@@ -135,11 +116,11 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
    */
   public SpillableSetImpl(long bucketId, @NotNull byte[] prefix,
       @NotNull SpillableStateStore store,
-      @NotNull Serde<T, Slice> serde)
+      @NotNull Serde<T> serde)
   {
     this.store = Preconditions.checkNotNull(store);
 
-    map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+    map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde));
   }
 
   public void setSize(int size)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 98f60d2..76e47f2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -27,11 +27,11 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
+import org.apache.apex.malhar.lib.utils.serde.PairSerde;
 import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -65,10 +65,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
   private SpillableStateStore store;
   private byte[] identifier;
   private long bucket;
-  private Serde<K, Slice> serdeKey;
-  private Serde<V, Slice> serdeValue;
+  private Serde<V> valueSerde;
   private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
 
+  protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+  protected transient Context.OperatorContext context;
   private SpillableSetMultimapImpl()
   {
     // for kryo
@@ -84,16 +85,15 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
    * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
    */
   public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+      Serde<K> keySerde,
+      Serde<V> valueSerde)
   {
     this.store = Preconditions.checkNotNull(store);
-    this.identifier = Preconditions.checkNotNull(identifier);
     this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+    this.valueSerde = Preconditions.checkNotNull(valueSerde);
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
 
-    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde));
   }
 
   public SpillableStateStore getStore()
@@ -112,17 +112,17 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = cache.get(key);
 
     if (spillableSet == null) {
-      Slice keySlice = serdeKey.serialize(key);
-      Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX));
+      Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
 
       if (meta == null) {
         return null;
       }
 
-      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
-      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
+      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
       spillableSet.setSize(meta.getLeft());
       spillableSet.setHead(meta.getRight());
+      spillableSet.setup(context);
     }
 
     cache.put(key, spillableSet);
@@ -166,7 +166,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = getHelper((K)key);
     if (spillableSet != null) {
       cache.remove((K)key);
-      Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+      Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
       map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
       spillableSet.clear();
       removedSets.add(spillableSet);
@@ -199,7 +199,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     if (cache.contains((K)key)) {
       return true;
     }
-    Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+    Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
     Pair<Integer, V> meta = map.get(keySlice);
     return meta != null && meta.getLeft() > 0;
   }
@@ -227,8 +227,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = getHelper(key);
 
     if (spillableSet == null) {
-      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
-      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+      spillableSet.setup(context);
       cache.put(key, spillableSet);
     }
     return spillableSet.add(value);
@@ -284,13 +284,16 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
   @Override
   public void setup(Context.OperatorContext context)
   {
+    this.context = context;
     map.setup(context);
+    keyValueSerdeManager.setup(store, bucket);
   }
 
   @Override
   public void beginWindow(long windowId)
   {
     map.beginWindow(windowId);
+    keyValueSerdeManager.beginWindow(windowId);
   }
 
   @Override
@@ -301,7 +304,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
       SpillableSetImpl<V> spillableSet = cache.get(key);
       spillableSet.endWindow();
 
-      map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX),
+      map.put(keyValueSerdeManager.serializeMetaKey(key, true),
           new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
     }
 
@@ -311,6 +314,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
 
     cache.endWindow();
     map.endWindow();
+
+    keyValueSerdeManager.resetReadBuffer();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
index b6ee3c0..44f003b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.lib.state.spillable;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Component;
@@ -32,6 +33,6 @@ import com.datatorrent.api.Operator;
  */
 @InterfaceStability.Evolving
 public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
-    Operator.CheckpointNotificationListener, WindowListener
+    Operator.CheckpointNotificationListener, WindowListener, BucketProvider
 {
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
index 0e1d55e..e80d38d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -21,6 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable;
 import java.util.Map;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Preconditions;
@@ -39,6 +42,7 @@ import com.google.common.collect.Sets;
 @InterfaceStability.Evolving
 public class WindowBoundedMapCache<K, V>
 {
+  private static final transient Logger logger = LoggerFactory.getLogger(WindowBoundedMapCache.class);
   public static final int DEFAULT_MAX_SIZE = 50000;
 
   private int maxSize = DEFAULT_MAX_SIZE;
@@ -109,7 +113,6 @@ public class WindowBoundedMapCache<K, V>
     Note: beginWindow is intentionally not implemented because many users need a cache that does not require
     beginWindow to be called.
    */
-
   public void endWindow()
   {
     int count = cache.size() - maxSize;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
index 61ab8a8..8acb044 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
@@ -23,7 +23,10 @@ import java.util.concurrent.Future;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.state.managed.Bucket;
 import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.collect.Maps;
@@ -74,6 +77,8 @@ public class InMemSpillableStateStore implements SpillableStateStore
       bucket = Maps.newHashMap();
       store.put(bucketId, bucket);
     }
+    key = SliceUtils.toBufferSlice(key);
+    value = SliceUtils.toBufferSlice(value);
 
     bucket.put(key, value);
   }
@@ -88,6 +93,10 @@ public class InMemSpillableStateStore implements SpillableStateStore
       store.put(bucketId, bucket);
     }
 
+    if (key.getClass() == Slice.class) {
+      //The hashCode of Slice was not correct, so correct it
+      key = new BufferSlice(key);
+    }
     return bucket.get(key);
   }
 
@@ -117,4 +126,21 @@ public class InMemSpillableStateStore implements SpillableStateStore
   {
     return store.toString();
   }
+
+  protected Bucket.DefaultBucket bucket;
+
+  @Override
+  public Bucket getBucket(long bucketId)
+  {
+    return bucket;
+  }
+
+  @Override
+  public Bucket ensureBucket(long bucketId)
+  {
+    if (bucket == null) {
+      bucket = new Bucket.DefaultBucket(1);
+    }
+    return bucket;
+  }
 }


Mime
View raw message