asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [04/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetable.java
new file mode 100644
index 0000000..7608128
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetable.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IResetable<T> {
+    void reset(T other);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparable.java
new file mode 100644
index 0000000..88e2e64
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparable.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IResetableComparable<T> extends IResetable<T>, Comparable<T>{
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparableFactory.java
new file mode 100644
index 0000000..a5a635e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IResetableComparableFactory.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IResetableComparableFactory<T> {
+    IResetableComparable<T> createResetableComparable();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeap.java
new file mode 100644
index 0000000..1cfb4e0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeap.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public class MaxHeap extends AbstractHeap implements IMaxHeap<IResetableComparable> {
+
+    public MaxHeap(IResetableComparableFactory factory, int capacity) {
+        super(factory, capacity);
+    }
+
+    @Override
+    protected void bubbleUp(int i) {
+        int pid = getParentId(i);
+        if (pid != NOT_EXIST && compareTo(pid, i) < 0) {
+            swap(pid, i);
+            bubbleUp(pid);
+        }
+    }
+
+    @Override
+    protected void trickleDown(int i) {
+        int maxChild = getMaxChild(i);
+        if (maxChild != NOT_EXIST && compareTo(i, maxChild) < 0) {
+            swap(maxChild, i);
+            trickleDown(maxChild);
+        }
+    }
+
+    @Override
+    public void getMax(IResetableComparable result) {
+        result.reset(entries[0]);
+        numEntry--;
+        if (numEntry > 0) {
+            entries[0].reset(entries[numEntry]);
+            trickleDown(0);
+        }
+    }
+
+    @Override
+    public void peekMax(IResetableComparable result) {
+        result.reset(entries[0]);
+    }
+
+    @Override
+    public void replaceMax(IResetableComparable newElement) {
+        entries[0].reset(newElement);
+        trickleDown(0);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinHeap.java
new file mode 100644
index 0000000..bfeda33
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinHeap.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public class MinHeap extends AbstractHeap implements IMinHeap<IResetableComparable> {
+
+    public MinHeap(IResetableComparableFactory factory, int capacity) {
+        super(factory, capacity);
+    }
+
+    @Override
+    protected void bubbleUp(int i) {
+        int pid = getParentId(i);
+        if (pid != NOT_EXIST && compareTo(pid, i) > 0) {
+            swap(pid, i);
+            bubbleUp(pid);
+        }
+    }
+
+    @Override
+    protected void trickleDown(int i) {
+        int minChild = getMinChild(i);
+        if (minChild != NOT_EXIST && compareTo(i, minChild) > 0) {
+            swap(minChild, i);
+            trickleDown(minChild);
+        }
+    }
+
+    @Override
+    public void getMin(IResetableComparable result) {
+        result.reset(entries[0]);
+        numEntry--;
+        if (numEntry > 0) {
+            entries[0].reset(entries[numEntry]);
+            trickleDown(0);
+        }
+    }
+
+    @Override
+    public void peekMin(IResetableComparable result) {
+        result.reset(entries[0]);
+    }
+
+    @Override
+    public void replaceMin(IResetableComparable newElement) {
+        entries[0].reset(newElement);
+        trickleDown(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinMaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinMaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinMaxHeap.java
new file mode 100644
index 0000000..e9782e2
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/MinMaxHeap.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public class MinMaxHeap extends AbstractHeap implements IMinMaxHeap<IResetableComparable> {
+
+    public MinMaxHeap(IResetableComparableFactory factory, int capacity) {
+        super(factory, capacity);
+    }
+
+    @Override
+    protected void bubbleUp(int cid) {
+        int pid = getParentId(cid);
+        if (isAtMinLevel(cid)) {
+            if (pid != NOT_EXIST && entries[pid].compareTo(entries[cid]) < 0) {
+                swap(cid, pid);
+                bubbleUpMax(pid);
+            } else {
+                bubbleUpMin(cid);
+            }
+        } else { // isAtMaxLevel
+            if (pid != NOT_EXIST && entries[pid].compareTo(entries[cid]) > 0) {
+                swap(cid, pid);
+                bubbleUpMin(pid);
+            } else {
+                bubbleUpMax(cid);
+            }
+        }
+    }
+
+    private void bubbleUpMin(int id) {
+        int gp = getGrandParentId(id);
+        if (gp != NOT_EXIST && entries[gp].compareTo(entries[id]) > 0) {
+            swap(gp, id);
+            bubbleUpMin(gp);
+        }
+    }
+
+    private void bubbleUpMax(int id) {
+        int gp = getGrandParentId(id);
+        if (gp != NOT_EXIST && entries[gp].compareTo(entries[id]) < 0) {
+            swap(gp, id);
+            bubbleUpMax(gp);
+        }
+    }
+
+    private boolean isAtMinLevel(int cid) {
+        return getLevel(cid) % 2 == 0;
+    }
+
+    /**
+     * Make sure to check the {@link #isEmpty()} before calling this function.
+     *
+     * @param result
+     */
+    @Override
+    public void getMin(IResetableComparable result) {
+        result.reset(entries[0]);
+        numEntry--;
+        if (numEntry > 0) {
+            entries[0].reset(entries[numEntry]);
+            trickleDown(0);
+        }
+    }
+
+    @Override
+    public void getMax(IResetableComparable result) {
+        int max = getMaxChild(0);
+        if (max == NOT_EXIST) {
+            getMin(result);
+            return;
+        }
+        result.reset(entries[max]);
+        numEntry--;
+        if (numEntry > max) {
+            entries[max].reset(entries[numEntry]);
+            trickleDown(max);
+        }
+    }
+
+    @Override
+    protected void trickleDown(int id) {
+        if (isAtMinLevel(id)) {
+            trickleDownMin(id);
+        } else {
+            trickleDownMax(id);
+        }
+    }
+
+    private void trickleDownMax(int id) {
+        int maxId = getMaxOfDescendents(id);
+        if (maxId == NOT_EXIST) {
+            return;
+        }
+        if (isDirectChild(id, maxId)) {
+            if (entries[id].compareTo(entries[maxId]) < 0) {
+                swap(id, maxId);
+            }
+        } else {
+            if (entries[id].compareTo(entries[maxId]) < 0) {
+                swap(id, maxId);
+                int pid = getParentId(maxId);
+                if (entries[maxId].compareTo(entries[pid]) < 0) {
+                    swap(pid, maxId);
+                }
+                trickleDownMax(maxId);
+            }
+        }
+    }
+
+    private void trickleDownMin(int id) {
+        int minId = getMinOfDescendents(id);
+        if (minId == NOT_EXIST) {
+            return;
+        }
+        if (isDirectChild(id, minId)) {
+            if (entries[id].compareTo(entries[minId]) > 0) {
+                swap(id, minId);
+            }
+        } else { // is grand child
+            if (entries[id].compareTo(entries[minId]) > 0) {
+                swap(id, minId);
+                int pid = getParentId(minId);
+                if (entries[minId].compareTo(entries[pid]) > 0) {
+                    swap(pid, minId);
+                }
+                trickleDownMin(minId);
+            }
+        }
+    }
+
+    private int getMaxOfDescendents(int id) {
+        int max = getMaxChild(id);
+        if (max != NOT_EXIST) {
+            int leftMax = getMaxChild(getLeftChild(id));
+            if (leftMax != NOT_EXIST) {
+                max = entries[leftMax].compareTo(entries[max]) > 0 ? leftMax : max;
+                int rightMax = getMaxChild(getRightChild(id));
+                if (rightMax != NOT_EXIST) {
+                    max = entries[rightMax].compareTo(entries[max]) > 0 ? rightMax : max;
+                }
+            }
+        }
+        return max;
+    }
+
+    private int getMinOfDescendents(int id) {
+        int min = getMinChild(id);
+        if (min != NOT_EXIST) {
+            int leftMin = getMinChild(getLeftChild(id));
+            if (leftMin != NOT_EXIST) {
+                min = entries[leftMin].compareTo(entries[min]) < 0 ? leftMin : min;
+                int rightMin = getMinChild(getRightChild(id));
+                if (rightMin != NOT_EXIST) {
+                    min = entries[rightMin].compareTo(entries[min]) < 0 ? rightMin : min;
+                }
+            }
+        }
+        return min;
+    }
+
+    public boolean isEmpty() {
+        return numEntry == 0;
+    }
+
+    /**
+     * Make sure to call the {@link #isEmpty()} before calling this function
+     *
+     * @param result is the object that will eventually contain minimum entry
+     */
+    @Override
+    public void peekMin(IResetableComparable result) {
+        result.reset(entries[0]);
+    }
+
+    @Override
+    public void peekMax(IResetableComparable result) {
+        int maxChild = getMaxChild(0);
+        if (maxChild == NOT_EXIST) {
+            peekMin(result);
+            return;
+        }
+        result.reset(entries[maxChild]);
+    }
+
+    @Override
+    public void replaceMin(IResetableComparable newElement) {
+        entries[0].reset(newElement);
+        trickleDown(0);
+    }
+
+    @Override
+    public void replaceMax(IResetableComparable newElement) {
+        int maxChild = getMaxChild(0);
+        if (maxChild == NOT_EXIST) {
+            replaceMin(newElement);
+            return;
+        }
+        entries[maxChild].reset(newElement);
+        bubbleUp(maxChild);
+        trickleDown(maxChild);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
index 1ea8393..3837004 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -41,7 +41,7 @@ public class SerializableHashTable implements ISerializableTable {
 
     public SerializableHashTable(int tableSize, final IHyracksTaskContext ctx) throws HyracksDataException {
         this.ctx = ctx;
-        int frameSize = ctx.getFrameSize();
+        int frameSize = ctx.getInitialFrameSize();
 
         int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
         int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
index 8ef478c..a42e06d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
@@ -14,7 +14,48 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.structures;
 
-public class TuplePointer {
+public class TuplePointer implements IResetable<TuplePointer> {
+    public static final int INVALID_ID = -1;
     public int frameIndex;
     public int tupleIndex;
+
+    public TuplePointer() {
+        this(INVALID_ID, INVALID_ID);
+    }
+
+    public TuplePointer(int frameId, int tupleId) {
+        reset(frameId, tupleId);
+    }
+
+    public void reset(TuplePointer other) {
+        reset(other.frameIndex, other.tupleIndex);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        TuplePointer that = (TuplePointer) o;
+
+        if (frameIndex != that.frameIndex)
+            return false;
+        return tupleIndex == that.tupleIndex;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = frameIndex;
+        result = 31 * result + tupleIndex;
+        return result;
+    }
+
+    public void reset(int frameId, int tupleId) {
+        this.frameIndex = frameId;
+        this.tupleIndex = tupleId;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index aff4273..09aacd6 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -36,7 +36,7 @@ public final class DeserializedOperatorNodePushable extends AbstractUnaryInputOp
             RecordDescriptor inRecordDesc) {
         this.ctx = ctx;
         this.delegate = delegate;
-        deserializer = inRecordDesc == null ? null : new FrameDeserializer(ctx.getFrameSize(), inRecordDesc);
+        deserializer = inRecordDesc == null ? null : new FrameDeserializer(inRecordDesc);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/MathUtil.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/MathUtil.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/MathUtil.java
new file mode 100644
index 0000000..c89ee25
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/MathUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.util;
+
+public class MathUtil {
+    /**
+     * Fast way to calculate the log2(x). Note: x should be >= 1.
+     *
+     * @param n
+     * @return
+     */
+    public static int log2Floor(int n) {
+        assert n >= 1;
+        int log = 0;
+        if (n > 0xffff) {
+            n >>>= 16;
+            log = 16;
+        }
+
+        if (n > 0xff) {
+            n >>>= 8;
+            log |= 8;
+        }
+
+        if (n > 0xf) {
+            n >>>= 4;
+            log |= 4;
+        }
+
+        if (n > 0b11) {
+            n >>>= 2;
+            log |= 2;
+        }
+
+        return log + (n >>> 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
index c06b50c..d6adcf4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -14,12 +14,13 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.util;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class ReferenceEntry {
     private final int runid;
-    private FrameTupleAccessor acccessor;
+    private IFrameTupleAccessor acccessor;
     private int tupleIndex;
     private int[] tPointers;
 
@@ -38,11 +39,11 @@ public class ReferenceEntry {
         return runid;
     }
 
-    public FrameTupleAccessor getAccessor() {
+    public IFrameTupleAccessor getAccessor() {
         return acccessor;
     }
 
-    public void setAccessor(FrameTupleAccessor fta) {
+    public void setAccessor(IFrameTupleAccessor fta) {
         this.acccessor = fta;
     }
 
@@ -62,15 +63,14 @@ public class ReferenceEntry {
         initTPointer(acccessor, tupleIndex, keyFields, nmkComputer);
     }
 
-    private void initTPointer(FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
+    private void initTPointer(IFrameTupleAccessor fta, int tupleIndex, int[] keyFields,
             INormalizedKeyComputer nmkComputer) {
         this.tupleIndex = tupleIndex;
         byte[] b1 = fta.getBuffer().array();
         for (int f = 0; f < keyFields.length; ++f) {
             int fIdx = keyFields[f];
-            tPointers[2 * f + 1] = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength()
-                    + fta.getFieldStartOffset(tupleIndex, fIdx);
-            tPointers[2 * f + 2] = fta.getFieldEndOffset(tupleIndex, fIdx) - fta.getFieldStartOffset(tupleIndex, fIdx);
+            tPointers[2 * f + 1] = fta.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+            tPointers[2 * f + 2] = fta.getFieldLength(tupleIndex, fIdx);
             if (f == 0) {
                 if (nmkComputer != null) {
                     tPointers[0] = nmkComputer.normalize(b1, tPointers[1], tPointers[2]);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
index 225f583..e994f04 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
@@ -18,13 +18,10 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.Comparator;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class ReferencedPriorityQueue {
-    private final int frameSize;
-    private final RecordDescriptor recordDescriptor;
     private final ReferenceEntry entries[];
     private final int size;
     private final BitSet runAvail;
@@ -34,10 +31,8 @@ public class ReferencedPriorityQueue {
     private final INormalizedKeyComputer nmkComputer;
     private final int[] keyFields;
 
-    public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
-            Comparator<ReferenceEntry> comparator, int[] keyFields, INormalizedKeyComputer nmkComputer) {
-        this.frameSize = frameSize;
-        this.recordDescriptor = recordDescriptor;
+    public ReferencedPriorityQueue(int initSize, Comparator<ReferenceEntry> comparator, int[] keyFields,
+            INormalizedKeyComputer nmkComputer) {
         if (initSize < 1)
             throw new IllegalArgumentException();
         this.comparator = comparator;
@@ -55,7 +50,7 @@ public class ReferencedPriorityQueue {
 
     /**
      * Retrieve the top entry without removing it
-     * 
+     *
      * @return the top entry
      */
     public ReferenceEntry peek() {
@@ -65,17 +60,14 @@ public class ReferencedPriorityQueue {
     /**
      * compare the new entry with entries within the queue, to find a spot for
      * this new entry
-     * 
-     * @param entry
+     *
+     * @param fta
      * @return runid of this entry
      * @throws IOException
      */
-    public int popAndReplace(FrameTupleAccessor fta, int tIndex) {
+    public int popAndReplace(IFrameTupleAccessor fta, int tIndex) {
         ReferenceEntry entry = entries[0];
-        if (entry.getAccessor() == null) {
-            entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
-        }
-        entry.getAccessor().reset(fta.getBuffer());
+        entry.setAccessor(fta);
         entry.setTupleIndex(tIndex, keyFields, nmkComputer);
 
         add(entry);
@@ -84,9 +76,8 @@ public class ReferencedPriorityQueue {
 
     /**
      * Push entry into priority queue
-     * 
-     * @param e
-     *            the new Entry
+     *
+     * @param e the new Entry
      */
     private void add(ReferenceEntry e) {
         ReferenceEntry min = entries[0];
@@ -127,7 +118,7 @@ public class ReferencedPriorityQueue {
 
     /**
      * Pop is called only when a run is exhausted
-     * 
+     *
      * @return
      */
     public ReferenceEntry pop() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/Utility.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/Utility.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/Utility.java
new file mode 100644
index 0000000..ee2b008
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/Utility.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+public class Utility {
+
+    public static String repeatString(char ch, int times) {
+        return new String(new char[times]).replace('\0', ch);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/Common.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/Common.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/Common.java
new file mode 100644
index 0000000..5a59ab6
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/Common.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
+
+public class Common {
+    static int MIN_FRAME_SIZE = 256;
+    static int NUM_MIN_FRAME = 15;
+    static int BUDGET = NUM_MIN_FRAME * MIN_FRAME_SIZE;
+
+    static FrameManager commonFrameManager = new FrameManager(MIN_FRAME_SIZE);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
new file mode 100644
index 0000000..a2d0f1e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotBestFitUsingTreeMapTest {
+
+    static int size = 10;
+
+    FrameFreeSlotSmallestFit policy;
+
+    @Before
+    public void intial() {
+        policy = new FrameFreeSlotSmallestFit();
+    }
+
+    @Test
+    public void testAll() {
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+            assertEquals(i, policy.popBestFit(i));
+        }
+        assertEquals(-1, policy.popBestFit(0));
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+        }
+        for (int i = 0; i < size; i++) {
+            assertEquals(i, policy.popBestFit(i));
+        }
+
+    }
+
+    @Test
+    public void testReset(){
+        testAll();
+        policy.reset();
+        testAll();
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirstTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirstTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirstTest.java
new file mode 100644
index 0000000..f3b923e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirstTest.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import static junit.framework.Assert.assertEquals;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotBiggestFirstTest {
+
+    static int size = 10;
+
+    FrameFreeSlotBiggestFirst policy;
+
+    @Before
+    public void intial() {
+        policy = new FrameFreeSlotBiggestFirst(size);
+    }
+
+    @Test
+    public void testAll() {
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+            assertEquals(i, policy.popBestFit(i));
+        }
+        assertEquals(-1, policy.popBestFit(0));
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+        }
+        for (int i = 0; i < size; i++) {
+            assertEquals(size - i - 1, policy.popBestFit(0));
+        }
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+        }
+        for (int i = 0; i < size / 2; i++) {
+            assertEquals(size - i - 1, policy.popBestFit(size / 2));
+        }
+        assertEquals(-1, policy.popBestFit(size / 2));
+        for (int i = 0; i < size / 2; i++) {
+            assertEquals(size / 2 - i - 1, policy.popBestFit(0));
+        }
+
+    }
+
+    @Test
+    public void testReset() {
+        testAll();
+        policy.reset();
+        testAll();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFitTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFitTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFitTest.java
new file mode 100644
index 0000000..94a8493
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFitTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotLastFitTest {
+
+    FrameFreeSlotLastFit zeroPolicy;
+    FrameFreeSlotLastFit unifiedPolicy;
+    FrameFreeSlotLastFit ascPolicy;
+    FrameFreeSlotLastFit dscPolicy;
+
+    static final int size = 10;
+    static final int medium = 5;
+
+    @Before
+    public void setUp() throws Exception {
+        zeroPolicy = new FrameFreeSlotLastFit(0);
+        unifiedPolicy = new FrameFreeSlotLastFit(size);
+        ascPolicy = new FrameFreeSlotLastFit(size);
+        dscPolicy = new FrameFreeSlotLastFit(size);
+    }
+
+    @Test
+    public void testPushAndPop() throws Exception {
+        for (int i = 0; i < size; i++) {
+            unifiedPolicy.pushNewFrame(i, medium);
+        }
+        for (int i = 0; i < size; i++) {
+            assertTrue(unifiedPolicy.popBestFit(medium) == size - i - 1);
+        }
+        assertTrue(unifiedPolicy.popBestFit(0) == -1);
+
+        for (int i = 0; i < size / 2; i++) {
+            ascPolicy.pushNewFrame(i, i);
+            assertEquals(ascPolicy.popBestFit(medium), -1);
+            dscPolicy.pushNewFrame(i, size - i - 1);
+            assertEquals(dscPolicy.popBestFit(medium), i);
+        }
+
+        for (int i = size / 2; i < size; i++) {
+            ascPolicy.pushNewFrame(i, i);
+            assertEquals(ascPolicy.popBestFit(medium), i);
+            dscPolicy.pushNewFrame(i, size - i - 1);
+            assertEquals(dscPolicy.popBestFit(medium), -1);
+        }
+
+        ascPolicy.reset();
+        for (int i = 0; i < size; i++) {
+            ascPolicy.pushNewFrame(size - i, size - i);
+        }
+
+        for (int i = 0; i < size; i++) {
+            assertEquals(size - i, ascPolicy.popBestFit(size - i));
+        }
+    }
+
+    @Test
+    public void testReset() throws Exception {
+        testPushAndPop();
+
+        zeroPolicy.reset();
+        unifiedPolicy.reset();
+        ascPolicy.reset();
+        dscPolicy.reset();
+        testPushAndPop();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePoolTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePoolTest.java
new file mode 100644
index 0000000..e9ac2ec
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePoolTest.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import static edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.Common.BUDGET;
+import static edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.Common.MIN_FRAME_SIZE;
+import static edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.Common.NUM_MIN_FRAME;
+import static edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.Common.commonFrameManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class VariableFramePoolTest {
+
+    VariableFramePool pool;
+    @Before
+    public void setUp() throws Exception {
+
+        pool = new VariableFramePool(commonFrameManager, BUDGET);
+    }
+
+    @Test
+    public void testGetMinFrameSize() throws Exception {
+        assertEquals(MIN_FRAME_SIZE, commonFrameManager.getInitialFrameSize());
+        assertEquals(MIN_FRAME_SIZE, pool.getMinFrameSize());
+    }
+
+    @Test
+    public void testGetMemoryBudgetBytes() throws Exception {
+        assertEquals(BUDGET, pool.getMemoryBudgetBytes());
+    }
+
+    @Test
+    public void testAllocateUniformFrameShouldSuccess() throws Exception {
+        testAllocateAllSpacesWithMinFrames();
+        testAllocateShouldFailAfterAllSpaceGetUsed();
+        pool.reset();
+        testAllocateAllSpacesWithMinFrames();
+        pool.close();
+    }
+
+    @Test
+    public void testResetShouldReuseExistingFrames() throws HyracksDataException {
+        Set<?> set1 = testAllocateAllSpacesWithMinFrames();
+        pool.reset();
+        Set<?> set2 = testAllocateAllSpacesWithMinFrames();
+        assertEquals(set1, set2);
+        pool.close();
+    }
+
+    @Test
+    public void testCloseShouldNotReuseExistingFrames() throws HyracksDataException {
+        Set<?> set1 = testAllocateAllSpacesWithMinFrames();
+        pool.close();
+        Set<?> set2 = testAllocateAllSpacesWithMinFrames();
+        assertFalse(set1.equals(set2));
+        pool.close();
+    }
+
+    @Test
+    public void testShouldReturnLargerFramesIfFitOneIsUsed() throws HyracksDataException {
+        Set<?> set = testAllocateVariableFrames();
+        pool.reset();
+        testShouldFindTheMatchFrames(set);
+        pool.reset();
+
+        // allocate seq: 1, 1, 2, 3, 4
+        ByteBuffer placeBuffer = pool.allocateFrame(MIN_FRAME_SIZE);
+        assertTrue(set.contains(new ByteBufferPtr(placeBuffer)));
+        for (int i = 1; i <= 4 ; i++) {
+            ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+        }
+        assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+        pool.close();
+    }
+
+    @Test
+    public void testShouldMergeIfNoLargerFrames() throws HyracksDataException {
+        Set<?> set = testAllocateAllSpacesWithMinFrames();
+        pool.reset();
+        int chunks = 5;
+        for (int i = 0; i < NUM_MIN_FRAME; i+= chunks) {
+            ByteBuffer buffer = pool.allocateFrame(chunks * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(!set.contains(new ByteBufferPtr(buffer)));
+        }
+    }
+
+    @Test
+    public void testUseMiddleSizeFrameAndNeedToMergeSmallAndBigger() throws HyracksDataException {
+        Set<?> set = testAllocateVariableFrames();
+        pool.reset();
+        // allocate seq: 3, 6, 1;
+        ByteBuffer buffer = pool.allocateFrame(3 * MIN_FRAME_SIZE);
+        assertTrue(set.contains(new ByteBufferPtr(buffer)));
+        buffer = pool.allocateFrame(6 * MIN_FRAME_SIZE);
+        assertFalse(set.contains(new ByteBufferPtr(buffer)));
+        buffer = pool.allocateFrame(1 * MIN_FRAME_SIZE);
+        assertTrue(set.contains(new ByteBufferPtr(buffer)));
+        assertEquals(5 * MIN_FRAME_SIZE, buffer.capacity());
+        pool.reset();
+    }
+
+    private void testAllocateShouldFailAfterAllSpaceGetUsed() throws HyracksDataException {
+        for (int i = 0; i < NUM_MIN_FRAME; i++) {
+            assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+        }
+    }
+
+    private HashSet<ByteBufferPtr> testAllocateAllSpacesWithMinFrames() throws HyracksDataException {
+        HashSet<ByteBufferPtr> set = new HashSet<>();
+        for (int i = 0; i < NUM_MIN_FRAME; i++) {
+            ByteBuffer buffer = pool.allocateFrame(MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(!set.contains(new ByteBufferPtr(buffer)));
+            set.add(new ByteBufferPtr(buffer));
+        }
+        return set;
+    }
+
+    /**
+     * Pool will become 1,2,3,4,5
+     *
+     * @throws HyracksDataException
+     */
+    private Set<ByteBufferPtr> testAllocateVariableFrames() throws HyracksDataException {
+        int budget = BUDGET;
+        int allocate = 0;
+        int i = 1;
+        Set<ByteBufferPtr> set = new HashSet<>();
+        while (budget - allocate >= i * MIN_FRAME_SIZE) {
+            ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            set.add(new ByteBufferPtr(buffer));
+            allocate += i++ * MIN_FRAME_SIZE;
+        }
+        return set;
+    }
+
+    private void testShouldFindTheMatchFrames(Set<?> set) throws HyracksDataException {
+        pool.reset();
+        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
+
+        for (int i = 0; i < list.size(); i++) {
+            ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+            assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+        }
+        pool.reset();
+        for (int i = list.size() - 1; i >= 0; i--) {
+            ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+            assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+        }
+
+        Collections.shuffle(list);
+        pool.reset();
+        for (int i = 0; i < list.size(); i++) {
+            ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+            assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+        }
+
+    }
+
+    public static class ByteBufferPtr {
+        ByteBuffer bytebuffer;
+
+        public ByteBufferPtr(ByteBuffer buffer) {
+            bytebuffer = buffer;
+        }
+
+        @Override
+        public int hashCode() {
+            return bytebuffer.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return this.bytebuffer == ((ByteBufferPtr) obj).bytebuffer;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramesMemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramesMemoryManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramesMemoryManagerTest.java
new file mode 100644
index 0000000..f56a62d
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramesMemoryManagerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import static edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.Common.BUDGET;
+import static edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.Common.MIN_FRAME_SIZE;
+import static edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.Common.NUM_MIN_FRAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FixedSizeFrame;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class VariableFramesMemoryManagerTest {
+    VariableFrameMemoryManager framesMemoryManager;
+    FrameTupleAccessor fta;
+    Random random;
+    List<IFrame> frameList;
+
+    @Before
+    public void setUp() throws Exception {
+        VariableFramePool framePool = new VariableFramePool(Common.commonFrameManager, BUDGET);
+        FrameFreeSlotLastFit policy = new FrameFreeSlotLastFit(NUM_MIN_FRAME);
+        framesMemoryManager = new VariableFrameMemoryManager(framePool, policy);
+        RecordDescriptor recordDescriptor = new RecordDescriptor(new ISerializerDeserializer[] { null });
+        fta = new FrameTupleAccessor(recordDescriptor);
+        random = new Random(System.currentTimeMillis());
+        frameList = new ArrayList<>();
+    }
+
+    @Test
+    public void testNormalIncomingFrames() throws HyracksDataException {
+        HashMap<Integer, Integer> tupleSet = prepareTuples();
+        for (IFrame frame : frameList) {
+            assertTrue(framesMemoryManager.insertFrame(frame.getBuffer()) >=0);
+        }
+        assertEquals(NUM_MIN_FRAME, framesMemoryManager.getNumFrames());
+        assertEveryTupleInFTAIsInFrameMemoryManager(tupleSet, framesMemoryManager);
+    }
+
+    @Test
+    public void testRandomTuplesAreAllStoredInBuffer() throws HyracksDataException {
+        Map<Integer, Integer> tupleSet = prepareRandomTuples();
+        for (IFrame frame : frameList) {
+            if (framesMemoryManager.insertFrame(frame.getBuffer()) < 0) {
+                fta.reset(frame.getBuffer());
+                for (int i = 0; i < fta.getTupleCount(); ++i) {
+                    int id = parseTuple(fta.getBuffer(),
+                            fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 0) + fta.getFieldSlotsLength());
+                    tupleSet.remove(id);
+                    //                    System.out.println(
+                    //                            "can't appended id:" + id + ",frameSize:" + frame.getInitialFrameSize());
+                }
+            }
+        }
+        assertEveryTupleInFTAIsInFrameMemoryManager(tupleSet, framesMemoryManager);
+        framesMemoryManager.reset();
+    }
+
+    @Test
+    public void testResetShouldWork() throws HyracksDataException {
+        testNormalIncomingFrames();
+        framesMemoryManager.reset();
+        testRandomTuplesAreAllStoredInBuffer();
+        framesMemoryManager.reset();
+        testRandomTuplesAreAllStoredInBuffer();
+    }
+
+    @Test
+    public void testCloseShouldAlsoWork() throws HyracksDataException {
+        testRandomTuplesAreAllStoredInBuffer();
+        framesMemoryManager.close();
+        testRandomTuplesAreAllStoredInBuffer();
+        framesMemoryManager.close();
+        testRandomTuplesAreAllStoredInBuffer();
+    }
+
+    private HashMap<Integer, Integer> prepareRandomTuples() throws HyracksDataException {
+        frameList.clear();
+        HashMap<Integer, Integer> set = new HashMap<>(NUM_MIN_FRAME);
+        int[] fieldSlot = { 0 };
+        int id = 0;
+        int size = 0;
+        while (size < BUDGET) {
+            int tupleLength = random.nextInt(BUDGET / 3) + 4;
+            IFrame frame = new FixedSizeFrame(Common.commonFrameManager
+                    .allocateFrame(FrameHelper.calcAlignedFrameSizeToStore(1, tupleLength, MIN_FRAME_SIZE)));
+            IFrameTupleAppender appender = new FrameTupleAppender();
+            appender.reset(frame, true);
+            //            System.out.println("id:" + id + ",frameSize:" + frame.getInitialFrameSize() / MIN_FRAME_SIZE);
+            ByteBuffer buffer = ByteBuffer.allocate(tupleLength);
+            buffer.putInt(0, id);
+            assertTrue(appender.append(fieldSlot, buffer.array(), 0, buffer.capacity()));
+            set.put(id++, tupleLength);
+            size += frame.getFrameSize();
+            frameList.add(frame);
+        }
+        return set;
+    }
+
+    private HashMap<Integer, Integer> prepareTuples() throws HyracksDataException {
+        frameList.clear();
+        HashMap<Integer, Integer> set = new HashMap<>(NUM_MIN_FRAME);
+        for (int i = 0; i < NUM_MIN_FRAME; ++i) {
+            IFrame frame = new FixedSizeFrame(Common.commonFrameManager.allocateFrame(MIN_FRAME_SIZE));
+            IFrameTupleAppender appender = new FrameTupleAppender();
+            appender.reset(frame, true);
+
+            int[] fieldSlot = { 0 };
+            ByteBuffer buffer = ByteBuffer.allocate(MIN_FRAME_SIZE / 2);
+            buffer.putInt(0, i);
+            appender.append(fieldSlot, buffer.array(), 0, buffer.capacity());
+            set.put(i, buffer.capacity());
+            frameList.add(frame);
+        }
+        return set;
+    }
+
+    private void assertEveryTupleInFTAIsInFrameMemoryManager(Map<Integer, Integer> tupleSet,
+            VariableFrameMemoryManager framesMemoryManager) {
+        for (int i = 0; i < framesMemoryManager.getNumFrames(); ++i) {
+            fta.reset(framesMemoryManager.getFrame(i), framesMemoryManager.getFrameStartOffset(i),
+                    framesMemoryManager.getFrameSize(i));
+            for (int t = 0; t < fta.getTupleCount(); t++) {
+                int id = parseTuple(fta.getBuffer(), fta.getTupleStartOffset(t) + fta.getFieldSlotsLength() + fta
+                        .getFieldStartOffset(t, 0));
+                //                System.out.println("frameid:" + i + ",tuple:" + t + ",has id:" + id + ",length:" +
+                //                        (fta.getTupleEndOffset(t) - fta.getTupleStartOffset(t) - fta.getFieldSlotsLength()));
+                assertTrue(tupleSet.remove(id) == fta.getTupleEndOffset(t) - fta.getTupleStartOffset(t) - fta
+                        .getFieldSlotsLength());
+            }
+        }
+        assertTrue(tupleSet.isEmpty());
+    }
+
+    private int parseTuple(ByteBuffer buffer, int fieldStartOffset) {
+        return buffer.getInt(fieldStartOffset);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManagerTest.java
new file mode 100644
index 0000000..0c54607
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManagerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FixedSizeFrame;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.Utility;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class VariableTupleMemoryManagerTest {
+    ISerializerDeserializer[] fieldsSerDer = new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+    RecordDescriptor recordDescriptor = new RecordDescriptor(fieldsSerDer);
+    ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
+    VariableTupleMemoryManager tupleMemoryManager;
+    FrameTupleAccessor inFTA = new FrameTupleAccessor(recordDescriptor);
+    Random random = new Random(System.currentTimeMillis());
+
+    @Before
+    public void setup() {
+        VariableFramePool framePool = new VariableFramePool(Common.commonFrameManager, Common.BUDGET);
+        tupleMemoryManager = new VariableTupleMemoryManager(framePool, recordDescriptor);
+    }
+
+    @Test
+    public void testInsertTupleToMemoryManager() throws HyracksDataException {
+        int iTuplePerFrame = 3;
+        Map<Integer, Integer> mapPrepare = prepareFixedSizeTuples(iTuplePerFrame);
+        Map<TuplePointer, Integer> mapInserted = insertInFTAToBufferShouldAllSuccess();
+        assertEachTupleInFTAIsInBuffer(mapPrepare, mapInserted);
+    }
+
+    @Test
+    public void testReset() throws HyracksDataException {
+        testInsertVariableSizeTupleToMemoryManager();
+        tupleMemoryManager.reset();
+        testInsertTupleToMemoryManager();
+        tupleMemoryManager.reset();
+        testInsertVariableSizeTupleToMemoryManager();
+    }
+
+    @Test
+    public void testDeleteTupleInMemoryManager() throws HyracksDataException {
+        int iTuplePerFrame = 3;
+        Map<Integer, Integer> map = prepareFixedSizeTuples(iTuplePerFrame);
+        Map<TuplePointer, Integer> mapInserted = insertInFTAToBufferShouldAllSuccess();
+        deleteRandomSelectedTuples(map, mapInserted, 1);
+        assertEachTupleInFTAIsInBuffer(map, mapInserted);
+    }
+
+    @Test
+    public void testReOrganizeSpace() throws HyracksDataException {
+        int iTuplePerFrame = 3;
+        Map<Integer, Integer> map = prepareFixedSizeTuples(iTuplePerFrame);
+        Map<Integer, Integer> copyMap = new HashMap<>(map);
+        Map<TuplePointer, Integer> mapInserted = insertInFTAToBufferShouldAllSuccess();
+        ByteBuffer buffer = deleteRandomSelectedTuples(map, mapInserted, map.size() / 2);
+        inFTA.reset(buffer);
+        Map<TuplePointer, Integer> mapInserted2 = insertInFTAToBufferShouldAllSuccess();
+        Map<TuplePointer, Integer> mergedMap = new HashMap<>(mapInserted);
+        mergedMap.putAll(mapInserted2);
+        assertEachTupleInFTAIsInBuffer(copyMap, mergedMap);
+    }
+
+    @Test
+    public void testReOrganizeVariableSizeTuple() throws HyracksDataException {
+        Map<Integer, Integer> map = prepareVariableSizeTuples();
+        Map<TuplePointer, Integer> mapInserted = insertInFTAToBufferCouldFailForLargerTuples(map);
+        Map<Integer, Integer> copyMap = new HashMap<>(map);
+
+        ByteBuffer buffer = deleteRandomSelectedTuples(map, mapInserted, map.size() / 2);
+        inFTA.reset(buffer);
+
+        Map<TuplePointer, Integer> mapInserted2 = insertInFTAToBufferCouldFailForLargerTuples(copyMap);
+        Map<TuplePointer, Integer> mergedMap = new HashMap<>(mapInserted);
+        mergedMap.putAll(mapInserted2);
+
+        assertEachTupleInFTAIsInBuffer(copyMap, mergedMap);
+    }
+
+    @Test
+    public void testInsertVariableSizeTupleToMemoryManager() throws HyracksDataException {
+        Map<Integer, Integer> map = prepareVariableSizeTuples();
+        Map<TuplePointer, Integer> mapInserted = insertInFTAToBufferCouldFailForLargerTuples(map);
+        assertEachTupleInFTAIsInBuffer(map, mapInserted);
+    }
+
+    private void assertEachTupleInFTAIsInBuffer(Map<Integer, Integer> map, Map<TuplePointer, Integer> mapInserted) {
+        ITupleBufferAccessor accessor = tupleMemoryManager.getTupleAccessor();
+        for (Map.Entry<TuplePointer, Integer> entry : mapInserted.entrySet()) {
+            accessor.reset(entry.getKey());
+            int dataLength = map.get(entry.getValue());
+            assertEquals((int) entry.getValue(),
+                    IntSerDeUtils.getInt(accessor.getTupleBuffer().array(), accessor.getAbsFieldStartOffset(0)));
+            assertEquals(dataLength, accessor.getTupleLength());
+        }
+        assertEquals(map.size(), mapInserted.size());
+    }
+
+    private Map<Integer, Integer> prepareFixedSizeTuples(int tuplePerFrame) throws HyracksDataException {
+        Map<Integer, Integer> dataSet = new HashMap<>();
+        ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET);
+        FixedSizeFrame frame = new FixedSizeFrame(buffer);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+
+        int sizePerTuple = (Common.MIN_FRAME_SIZE - 1 - 4 - tuplePerFrame * 4 - 4) / tuplePerFrame;
+        int sizeChar = sizePerTuple - fieldsSerDer.length * 4 - 4 - 4;
+        assert (sizeChar > 0);
+        for (int i = 0; i < Common.NUM_MIN_FRAME * tuplePerFrame; i++) {
+            tupleBuilder.reset();
+            tupleBuilder.addField(fieldsSerDer[0], i);
+            tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', sizeChar));
+            assertTrue(appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize()));
+            dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4);
+        }
+        inFTA.reset(buffer);
+        return dataSet;
+    }
+
+    private Map<Integer, Integer> prepareVariableSizeTuples() throws HyracksDataException {
+        Map<Integer, Integer> dataSet = new HashMap<>();
+        ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET);
+        FixedSizeFrame frame = new FixedSizeFrame(buffer);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+
+        for (int i = 0; true; i++) {
+            tupleBuilder.reset();
+            tupleBuilder.addField(fieldsSerDer[0], i);
+            tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', i));
+            if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                break;
+            }
+            dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4);
+        }
+        inFTA.reset(buffer);
+        return dataSet;
+    }
+
+    private Map<TuplePointer, Integer> insertInFTAToBufferShouldAllSuccess() throws HyracksDataException {
+        Map<TuplePointer, Integer> tuplePointerIntegerMap = new HashMap<>();
+        for (int i = 0; i < inFTA.getTupleCount(); i++) {
+            TuplePointer tuplePointer = new TuplePointer();
+            assertTrue(tupleMemoryManager.insertTuple(inFTA, i, tuplePointer));
+            tuplePointerIntegerMap.put(tuplePointer,
+                    IntSerDeUtils.getInt(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(i, 0)));
+        }
+        return tuplePointerIntegerMap;
+    }
+
+    private Map<TuplePointer, Integer> insertInFTAToBufferCouldFailForLargerTuples(Map<Integer, Integer> map)
+            throws HyracksDataException {
+        Map<TuplePointer, Integer> tuplePointerIdMap = new HashMap<>();
+        int i = 0;
+        for (; i < inFTA.getTupleCount(); i++) {
+            TuplePointer tuplePointer = new TuplePointer();
+            if (!tupleMemoryManager.insertTuple(inFTA, i, tuplePointer)) {
+                break;
+            }
+            tuplePointerIdMap.put(tuplePointer,
+                    IntSerDeUtils.getInt(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(i, 0)));
+        }
+        for (; i < inFTA.getTupleCount(); i++) {
+            map.remove(IntSerDeUtils.getInt(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(i, 0)));
+        }
+        return tuplePointerIdMap;
+    }
+
+    private ByteBuffer deleteRandomSelectedTuples(Map<Integer, Integer> map, Map<TuplePointer, Integer> mapInserted,
+            int minNumOfRecordTobeDeleted)
+            throws HyracksDataException {
+        ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET);
+        FixedSizeFrame frame = new FixedSizeFrame(buffer);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+
+        assert (minNumOfRecordTobeDeleted < mapInserted.size());
+        int countDeleted = minNumOfRecordTobeDeleted + random.nextInt(mapInserted.size() - minNumOfRecordTobeDeleted);
+
+        ITupleBufferAccessor accessor = tupleMemoryManager.getTupleAccessor();
+        for (int i = 0; i < countDeleted; i++) {
+            Iterator<Map.Entry<TuplePointer, Integer>> iter = mapInserted.entrySet().iterator();
+            assert (iter.hasNext());
+            Map.Entry<TuplePointer, Integer> pair = iter.next();
+            accessor.reset(pair.getKey());
+            appender.append(accessor.getTupleBuffer().array(), accessor.getTupleStartOffset(),
+                    accessor.getTupleLength());
+            map.remove(pair.getValue());
+            tupleMemoryManager.deleteTuple(pair.getKey());
+            iter.remove();
+        }
+        return buffer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
new file mode 100644
index 0000000..eece886
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.Utility;
+
+public class DeletableFrameTupleAppenderTest {
+    DeletableFrameTupleAppender appender;
+    ISerializerDeserializer[] fields = new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE,
+    };
+    RecordDescriptor recordDescriptor = new RecordDescriptor(fields);
+    ArrayTupleBuilder builder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
+    static final char TEST_CH = 'x';
+
+    int cap = 256;
+
+    @Before
+    public void initial() throws HyracksDataException {
+        appender = new DeletableFrameTupleAppender(recordDescriptor);
+    }
+
+    @Test
+    public void testClear() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(cap);
+        appender.clear(buffer);
+        assertTrue(appender.getBuffer() == buffer);
+        assertTrue(appender.getTupleCount() == 0);
+        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+    }
+
+    ByteBuffer makeAFrame(int capacity, int count, int deletedBytes) throws HyracksDataException {
+        ByteBuffer buffer = ByteBuffer.allocate(capacity);
+        int metaOffset = capacity - 4;
+        buffer.putInt(metaOffset, deletedBytes);
+        metaOffset -= 4;
+        buffer.putInt(metaOffset, count);
+        metaOffset -= 4;
+        for (int i = 0; i < count; i++, metaOffset -= 4) {
+            makeARecord(builder, i);
+            for (int x = 0; x < builder.getFieldEndOffsets().length; x++) {
+                buffer.putInt(builder.getFieldEndOffsets()[x]);
+            }
+            buffer.put(builder.getByteArray(), 0, builder.getSize());
+            assert (metaOffset > buffer.position());
+            buffer.putInt(metaOffset, buffer.position());
+
+        }
+        return buffer;
+    }
+
+    void makeARecord(ArrayTupleBuilder builder, int i) throws HyracksDataException {
+        builder.reset();
+        builder.addField(fields[0], i + 1);
+        builder.addField(fields[1], Utility.repeatString(TEST_CH, i + 1));
+    }
+
+    int assertTupleIsExpected(int i, int dataOffset) {
+        int tupleLength = 2 * 4 + 4 + 2 + i + 1;
+        assertEquals(dataOffset, appender.getTupleStartOffset(i));
+        assertEquals(tupleLength, appender.getTupleLength(i));
+
+        assertEquals(dataOffset + 2 * 4, appender.getAbsoluteFieldStartOffset(i, 0));
+        assertEquals(4, appender.getFieldLength(i, 0));
+        assertEquals(i + 1,
+                IntSerDeUtils.getInt(appender.getBuffer().array(), appender.getAbsoluteFieldStartOffset(i, 0)));
+        assertEquals(dataOffset + 2 * 4 + 4, appender.getAbsoluteFieldStartOffset(i, 1));
+        assertEquals(2 + i + 1, appender.getFieldLength(i, 1));
+        return tupleLength;
+    }
+
+    @Test
+    public void testReset() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(cap);
+        appender.reset(buffer);
+        assertTrue(appender.getBuffer() == buffer);
+        assertTrue(appender.getTupleCount() == 0);
+        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+
+        int count = 10;
+        int deleted = 7;
+        buffer = makeAFrame(cap, count, deleted);
+        int pos = buffer.position();
+        appender.reset(buffer);
+        assertTrue(appender.getBuffer() == buffer);
+        assertTrue(appender.getTupleCount() == count);
+        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4 - count * 4 - pos);
+        assertTrue(appender.getTotalFreeSpace() == appender.getContiguousFreeSpace() + deleted);
+
+        int dataOffset = 0;
+        for (int i = 0; i < count; i++) {
+            dataOffset += assertTupleIsExpected(i, dataOffset);
+        }
+    }
+
+    @Test
+    public void testAppend() throws Exception {
+        int count = 10;
+        ByteBuffer bufferRead = makeAFrame(cap, count, 0);
+        DeletableFrameTupleAppender accessor = new DeletableFrameTupleAppender(recordDescriptor);
+        accessor.reset(bufferRead);
+        ByteBuffer bufferWrite = ByteBuffer.allocate(cap);
+        appender.clear(bufferWrite);
+        for (int i = 0; i < accessor.getTupleCount(); i++) {
+            appender.append(accessor, i);
+        }
+        for (int i = 0; i < bufferRead.capacity(); i++) {
+            assertEquals(bufferRead.get(i), bufferWrite.get(i));
+        }
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        int count = 10;
+        int deleteSpace = 0;
+        ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
+        appender.reset(buffer);
+
+        int freeSpace = appender.getContiguousFreeSpace();
+        for (int i = 0; i < appender.getTupleCount(); i++) {
+            deleteSpace += assertDeleteSucceed(i, freeSpace, deleteSpace);
+            int innerOffset = deleteSpace;
+            for (int j = i + 1; j < appender.getTupleCount(); j++) {
+                innerOffset += assertTupleIsExpected(j, innerOffset);
+            }
+        }
+    }
+
+    @Test
+    public void testResetAfterDelete() throws Exception {
+        testDelete();
+        appender.reset(appender.getBuffer());
+        assertEquals(cap - appender.getTupleCount() * 4 - 4 - 4, appender.getTotalFreeSpace());
+
+    }
+
+    int assertDeleteSucceed(int i, int freeSpaceBeforeDelete, int deleteSpace) {
+        int startOffset = appender.getTupleStartOffset(i);
+        int endOffset = appender.getTupleEndOffset(i);
+        int tupleLength = appender.getTupleLength(i);
+
+        appender.delete(i);
+
+        assertEquals(startOffset, appender.getTupleStartOffset(i));
+        assertEquals(-endOffset, appender.getTupleEndOffset(i));
+        assertEquals(-tupleLength, appender.getTupleLength(i));
+        assertEquals(freeSpaceBeforeDelete, appender.getContiguousFreeSpace());
+        assertEquals(deleteSpace + tupleLength + freeSpaceBeforeDelete, appender.getTotalFreeSpace());
+        return tupleLength;
+    }
+
+    @Test
+    public void testAppendAndDelete() throws Exception {
+        int cap = 1024;
+        int count = 10;
+        int deleteSpace = 0;
+        ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
+        int dataOffset = buffer.position();
+        appender.reset(buffer);
+
+        int freeSpace = appender.getContiguousFreeSpace();
+        int[] deleteSet = new int[] { 1, 3, 5 };
+        for (int i = 0; i < deleteSet.length; i++) {
+            deleteSpace += assertDeleteSucceed(deleteSet[i], freeSpace, deleteSpace);
+        }
+
+        ByteBuffer bufferRead = makeAFrame(cap, count * 2, 0);
+        DeletableFrameTupleAppender accessor = new DeletableFrameTupleAppender(recordDescriptor);
+        accessor.reset(bufferRead);
+
+        for (int i = count; i < accessor.getTupleCount(); i++) {
+            int id = appender.append(accessor, i);
+            dataOffset += assertTupleIsExpected(i, dataOffset);
+            assertEquals(i, id);
+        }
+
+        appender.reOrganizeBuffer();
+        dataOffset = 0;
+        for (int i = 0; i < appender.getTupleCount(); i++) {
+            if (ArrayUtils.contains(deleteSet, i)) {
+                continue;
+            }
+            dataOffset += assertTupleIsExpected(i, dataOffset);
+        }
+    }
+
+    @Test
+    public void testReOrganizeBuffer() throws Exception {
+        int count = 10;
+        testDelete();
+        appender.reOrganizeBuffer();
+        ByteBuffer bufferRead = makeAFrame(cap, count, 0);
+        DeletableFrameTupleAppender accessor = new DeletableFrameTupleAppender(recordDescriptor);
+        accessor.reset(bufferRead);
+        for (int i = 0; i < accessor.getTupleCount(); i++) {
+            appender.append(accessor, i);
+        }
+        for (int i = 0; i < bufferRead.capacity(); i++) {
+            assertEquals(bufferRead.get(i), appender.getBuffer().get(i));
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstracHeapTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstracHeapTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstracHeapTest.java
new file mode 100644
index 0000000..672e078
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstracHeapTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+
+public class AbstracHeapTest {
+    Random random = new Random(System.currentTimeMillis());
+
+    class IntFactory implements IResetableComparableFactory<Int> {
+        @Override
+        public IResetableComparable<Int> createResetableComparable() {
+            return new Int();
+        }
+    }
+
+    class Int implements IResetableComparable<Int> {
+        int i;
+
+        public Int() {
+            i = 0;
+        }
+
+        public Int(int i) {
+            this.i = i;
+        }
+
+        @Override
+        public void reset(Int other) {
+            i = other.i;
+        }
+
+        @Override
+        public int compareTo(Int o) {
+            return Integer.compare(i, o.i);
+        }
+    }
+
+    protected void assertGetMinHeapIsSorted(IMinHeap minHeap) {
+        int count = minHeap.getNumEntries();
+        Int minI = new Int();
+        Int peekI = new Int();
+        int preI = Integer.MIN_VALUE;
+        while (!minHeap.isEmpty()) {
+            count--;
+            minHeap.peekMin(peekI);
+            minHeap.getMin(minI);
+            assertTrue(peekI.compareTo(minI) == 0);
+            assertTrue(preI <= minI.i);
+            preI = minI.i;
+        }
+        assertEquals(0, count);
+    }
+
+    protected void assertGetMaxHeapIsSorted(IMaxHeap maxHeap) {
+        int count = maxHeap.getNumEntries();
+        Int maxI = new Int();
+        Int peekI = new Int();
+        int preI = Integer.MAX_VALUE;
+        while (!maxHeap.isEmpty()) {
+            count--;
+            maxHeap.peekMax(peekI);
+            maxHeap.getMax(maxI);
+            assertTrue(peekI.compareTo(maxI) == 0);
+            assertTrue(preI >= maxI.i);
+            preI = maxI.i;
+        }
+        assertEquals(0, count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeapTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeapTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeapTest.java
new file mode 100644
index 0000000..3f2404c
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/structures/MaxHeapTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class MaxHeapTest extends AbstracHeapTest {
+    @Test
+    public void testInitialMinHeap() {
+        int capacity = 10;
+        MaxHeap maxHeap = new MaxHeap(new IntFactory(), capacity);
+        assertTrue(maxHeap.isEmpty());
+        assertEquals(0, maxHeap.getNumEntries());
+    }
+
+    @Test
+    public void testInsertSmallAmountElements() {
+        int capacity = 10;
+        MaxHeap maxHeap = new MaxHeap(new IntFactory(), capacity);
+        for (int i = 0; i < capacity; i++) {
+            maxHeap.insert(new Int(capacity - i));
+        }
+        assertEquals(capacity, maxHeap.getNumEntries());
+        assertFalse(maxHeap.isEmpty());
+
+        assertGetMaxHeapIsSorted(maxHeap);
+
+        for (int i = 0; i < capacity; i++) {
+            maxHeap.insert(new Int(random.nextInt()));
+        }
+        assertEquals(capacity, maxHeap.getNumEntries());
+        assertFalse(maxHeap.isEmpty());
+        assertGetMaxHeapIsSorted(maxHeap);
+    }
+
+    @Test
+    public void testInsertLargerThanCapacityElements() {
+        int capacity = 10;
+        MaxHeap maxHeap = new MaxHeap(new IntFactory(), capacity);
+        for (int i = 0; i < capacity; i++) {
+            maxHeap.insert(new Int(capacity - i));
+        }
+        assertEquals(capacity, maxHeap.getNumEntries());
+        assertFalse(maxHeap.isEmpty());
+        assertGetMaxHeapIsSorted(maxHeap);
+
+        for (int i = 0; i < capacity * 10; i++) {
+            maxHeap.insert(new Int(random.nextInt()));
+        }
+        assertEquals(capacity * 10, maxHeap.getNumEntries());
+        assertFalse(maxHeap.isEmpty());
+        assertGetMaxHeapIsSorted(maxHeap);
+
+    }
+
+    @Test
+    public void testReplaceMax() {
+        int capacity = 10;
+        MaxHeap maxHeap = new MaxHeap(new IntFactory(), capacity);
+        for (int i = capacity; i < capacity * 2; i++) {
+            maxHeap.insert(new Int(i));
+        }
+        assertEquals(capacity, maxHeap.getNumEntries());
+        assertFalse(maxHeap.isEmpty());
+
+        for (int i = 0; i < capacity; i++) {
+            maxHeap.replaceMax(new Int(i));
+        }
+        assertEquals(capacity, maxHeap.getNumEntries());
+        assertFalse(maxHeap.isEmpty());
+
+        Int maxI = new Int();
+        Int peekI = new Int();
+        int i = 0;
+        while (!maxHeap.isEmpty()) {
+            maxHeap.peekMax(peekI);
+            maxHeap.getMax(maxI);
+            assertTrue(peekI.compareTo(maxI) == 0);
+            assertEquals(  i++, capacity - 1 - maxI.i);
+        }
+    }
+}


Mime
View raw message