asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [11/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject
Date Fri, 26 Feb 2016 05:54:08 GMT
Implemented the memory-bounded HashGroupby and HashJoin for BigObject

It contains both hash grouby and hash join changes.

The main change is
1. update the ExternalGroupby to Hash-based groupby
2. update the Join operators to use the Buffermanager.

The buffer manager part is moved from the Sort package to upper
level so that it can be shared by all the operators.

Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460
Reviewed-on: https://asterix-gerrit.ics.uci.edu/398
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/6abc63e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/6abc63e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/6abc63e2

Branch: refs/heads/master
Commit: 6abc63e23029b9c5279a6a3be28f705b2a14bd7f
Parents: c20aa64
Author: Jianfeng Jia <jianfeng.jia@gmail.com>
Authored: Thu Feb 25 18:28:20 2016 -0800
Committer: Jianfeng Jia <jianfeng.jia@gmail.com>
Committed: Thu Feb 25 21:48:47 2016 -0800

----------------------------------------------------------------------
 .../physical/ExternalGroupByPOperator.java      |  42 +-
 .../physical/HybridHashJoinPOperator.java       |   2 +-
 .../SetAlgebricksPhysicalOperatorsRule.java     |   4 +-
 ...estedPlansAccumulatingAggregatorFactory.java |   4 +-
 ...SerializableAggregatorDescriptorFactory.java |  20 +-
 ...AlgebricksAccumulatingAggregatorFactory.java |   6 +-
 .../sort/InMemorySortRuntimeFactory.java        |  12 +-
 .../tests/pushruntime/PushRuntimeTest.java      |  76 ---
 .../apache/hyracks/api/comm/FixedSizeFrame.java |  14 +-
 .../apache/hyracks/api/comm/IFrameAppender.java |   1 +
 .../api/context/IHyracksFrameMgrContext.java    |   2 +-
 .../comm/io/FixedSizeFrameTupleAppender.java    |  32 +
 .../common/comm/io/FrameTupleAccessor.java      |   7 +-
 .../comm/io/FrameTuplePairComparator.java       |  62 --
 .../common/io/GeneratedRunFileReader.java       |  41 ++
 .../dataflow/common/io/RunFileReader.java       |   1 +
 .../dataflow/common/io/RunFileWriter.java       |  14 +-
 .../hadoop/mapreduce/ShuffleFrameReader.java    |   2 +-
 .../AbstractTuplePointerAccessor.java           | 134 ++++
 .../dataflow/std/buffermanager/BufferInfo.java  |  59 ++
 .../buffermanager/DeallocatableFramePool.java   | 123 ++++
 .../std/buffermanager/EnumFreeSlotPolicy.java   |  39 ++
 .../FrameFreeSlotBiggestFirst.java              | 106 ++++
 .../std/buffermanager/FrameFreeSlotLastFit.java |  85 +++
 .../FrameFreeSlotPolicyFactory.java             |  46 ++
 .../buffermanager/FrameFreeSlotSmallestFit.java |  63 ++
 .../buffermanager/IDeallocatableFramePool.java  |  28 +
 .../IDeletableTupleBufferManager.java           |  28 +
 .../std/buffermanager/IFrameBufferManager.java  |  59 ++
 .../std/buffermanager/IFrameFreeSlotPolicy.java |  48 ++
 .../dataflow/std/buffermanager/IFramePool.java  |  52 ++
 .../IPartitionedMemoryConstrain.java            |  34 +
 .../IPartitionedTupleBufferManager.java         | 111 ++++
 .../std/buffermanager/ITupleBufferManager.java  |  47 ++
 .../buffermanager/ITuplePointerAccessor.java    |  42 ++
 .../PreferToSpillFullyOccupiedFramePolicy.java  |  99 +++
 .../VPartitionTupleBufferManager.java           | 277 ++++++++
 .../VariableDeletableTupleMemoryManager.java    | 188 ++++++
 .../VariableFrameMemoryManager.java             | 113 ++++
 .../std/buffermanager/VariableFramePool.java    | 205 ++++++
 .../AbstractRunningAggregatorDescriptor.java    |  45 --
 .../dataflow/std/group/AggregateType.java       |  25 +
 .../std/group/FrameToolsForGroupers.java        |  97 ---
 .../std/group/HashSpillableTableFactory.java    | 522 +++++----------
 .../std/group/IAggregatorDescriptor.java        |  49 +-
 .../dataflow/std/group/ISpillableTable.java     |  52 +-
 .../std/group/ISpillableTableFactory.java       |  10 +-
 .../MultiFieldsAggregatorFactory.java           |  20 +-
 .../ExternalGroupBuildOperatorNodePushable.java | 128 ++--
 .../ExternalGroupMergeOperatorNodePushable.java | 470 --------------
 .../ExternalGroupOperatorDescriptor.java        |  52 +-
 .../std/group/external/ExternalGroupState.java  |  21 +-
 .../ExternalGroupWriteOperatorNodePushable.java | 167 +++++
 .../std/group/external/ExternalHashGroupBy.java |  97 +++
 .../group/external/IRunFileWriterGenerator.java |  27 +
 .../std/group/hash/GroupingHashTable.java       | 251 --------
 .../HashGroupBuildOperatorNodePushable.java     |  93 ---
 .../group/hash/HashGroupOperatorDescriptor.java | 116 ----
 .../HashGroupOutputOperatorNodePushable.java    |  48 --
 .../dataflow/std/group/hash/HashGroupState.java |  55 --
 .../sort/ExternalSortGroupByRunGenerator.java   |   6 +-
 .../sort/ExternalSortGroupByRunMerger.java      |  20 +-
 .../sort/SortGroupByOperatorDescriptor.java     |  65 +-
 .../join/GraceHashJoinOperatorNodePushable.java |   2 +-
 .../join/HybridHashJoinOperatorDescriptor.java  |  21 +-
 .../dataflow/std/join/InMemoryHashJoin.java     | 105 ++-
 .../InMemoryHashJoinOperatorDescriptor.java     |   2 +-
 .../dataflow/std/join/NestedLoopJoin.java       |  99 ++-
 .../std/join/OptimizedHybridHashJoin.java       | 632 +++++++------------
 ...timizedHybridHashJoinOperatorDescriptor.java | 499 ++++++++-------
 .../sort/AbstractExternalSortRunGenerator.java  |  91 +++
 .../std/sort/AbstractExternalSortRunMerger.java | 243 +++++++
 .../dataflow/std/sort/AbstractFrameSorter.java  |  62 +-
 .../std/sort/AbstractSortRunGenerator.java      |  18 +-
 .../sort/AbstractSorterOperatorDescriptor.java  |  20 +-
 .../sort/ExternalSortOperatorDescriptor.java    |  25 +-
 .../std/sort/ExternalSortRunGenerator.java      |  57 +-
 .../std/sort/ExternalSortRunMerger.java         | 219 +------
 .../dataflow/std/sort/FrameSorterMergeSort.java |  46 +-
 .../dataflow/std/sort/FrameSorterQuickSort.java |  56 +-
 .../dataflow/std/sort/HeapSortRunGenerator.java |  10 +-
 .../std/sort/HybridTopKSortRunGenerator.java    |  14 +-
 .../dataflow/std/sort/IRunGenerator.java        |   3 +-
 .../hyracks/dataflow/std/sort/ISorter.java      |   2 +-
 .../sort/InMemorySortOperatorDescriptor.java    |  12 +-
 .../std/sort/RunAndMaxFrameSizePair.java        |  36 --
 .../std/sort/TopKSorterOperatorDescriptor.java  |  19 +-
 .../dataflow/std/sort/TupleSorterHeapSort.java  |  27 +-
 .../sort/buffermanager/EnumFreeSlotPolicy.java  |  26 -
 .../FrameFreeSlotBiggestFirst.java              | 101 ---
 .../buffermanager/FrameFreeSlotLastFit.java     |  85 ---
 .../buffermanager/FrameFreeSlotSmallestFit.java |  63 --
 .../sort/buffermanager/IFrameBufferManager.java |  72 ---
 .../buffermanager/IFrameFreeSlotPolicy.java     |  48 --
 .../std/sort/buffermanager/IFramePool.java      |  52 --
 .../buffermanager/ITupleBufferAccessor.java     |  40 --
 .../sort/buffermanager/ITupleBufferManager.java |  46 --
 .../VariableFrameMemoryManager.java             | 136 ----
 .../sort/buffermanager/VariableFramePool.java   | 204 ------
 .../VariableTupleMemoryManager.java             | 207 ------
 .../IAppendDeletableFrameTupleAccessor.java     |   2 +-
 .../dataflow/std/structures/AbstractHeap.java   |   4 +-
 .../std/structures/ISerializableTable.java      |  16 +-
 .../std/structures/SerializableHashTable.java   |  93 ++-
 .../std/util/FrameTuplePairComparator.java      |  88 +++
 .../buffermanager/AbstractFramePoolTest.java    | 144 +++++
 .../AbstractTupleMemoryManagerTest.java         | 113 ++++
 .../dataflow/std/buffermanager/Common.java      |  30 +
 .../buffermanager/DeletableFramePoolTest.java   |  87 +++
 .../FrameFreeSlotBestFitUsingTreeMapTest.java   |  64 ++
 .../FrameFreeSlotBiggestFirstTest.java          |  74 +++
 .../buffermanager/FrameFreeSlotLastFitTest.java |  90 +++
 .../VPartitionTupleBufferManagerTest.java       | 159 +++++
 .../buffermanager/VariableFramePoolTest.java    | 119 ++++
 .../VariableFramesMemoryManagerTest.java        | 175 +++++
 .../VariableTupleMemoryManagerTest.java         | 181 ++++++
 .../dataflow/std/sort/buffermanager/Common.java |  30 -
 .../FrameFreeSlotBestFitUsingTreeMapTest.java   |  64 --
 .../FrameFreeSlotBiggestFirstTest.java          |  74 ---
 .../buffermanager/FrameFreeSlotLastFitTest.java |  90 ---
 .../buffermanager/VariableFramePoolTest.java    | 220 -------
 .../VariableFramesMemoryManagerTest.java        | 174 -----
 .../VariableTupleMemoryManagerTest.java         | 234 -------
 .../structures/SerializableHashTableTest.java   |  94 +++
 .../tests/integration/AggregationTest.java      | 526 ++++-----------
 .../tests/integration/HeapSortMergeTest.java    | 171 +++++
 .../integration/LocalityAwareConnectorTest.java |  65 +-
 .../integration/OptimizedSortMergeTest.java     | 171 -----
 ...TPCHCustomerOptimizedHybridHashJoinTest.java | 181 ++----
 .../TPCHCustomerOrderHashJoinTest.java          |   4 +-
 .../integration/VSizeFrameSortMergeTest.java    |   1 -
 .../tests/unit/AbstractExternalGroupbyTest.java | 243 +++++++
 .../tests/unit/AbstractRunGeneratorTest.java    |  50 +-
 .../tests/unit/ExternalHashGroupbyTest.java     |  58 ++
 .../tests/unit/RunMergingFrameReaderTest.java   |  24 +-
 .../hyracks/tests/unit/SortGroupbyTest.java     |  77 +++
 .../text-example/textclient/pom.xml             |  19 +-
 .../text/client/ExternalGroupClient.java        | 325 ----------
 .../examples/text/client/WordCountMain.java     |  76 ++-
 .../tpch-example/tpchclient/pom.xml             | 115 ++--
 .../hyracks/examples/tpch/client/Common.java    |  53 +-
 .../hyracks/examples/tpch/client/Groupby.java   | 207 ++++++
 .../hyracks/examples/tpch/client/Join.java      | 232 +++----
 .../hyracks/examples/tpch/client/Sort.java      |   2 +-
 .../am/common/api/ITwoPCIndexBulkLoader.java    |  11 +-
 ...eIndexDiskOrderScanOperatorNodePushable.java |   1 +
 .../hyracks/test/support/TestTaskContext.java   |  12 +-
 pom.xml                                         |   1 +
 148 files changed, 6696 insertions(+), 6512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index efb9681..b15ea0b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -59,26 +58,26 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
 import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.aggreg.SerializableAggregatorDescriptorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 
 public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
 
-    private int tableSize = 0;
-    private int frameLimit = 0;
+    private final int tableSize;
+    private final long fileSize;
+    private final int frameLimit;
     private List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
 
     public ExternalGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
-            int tableSize) {
+            int tableSize, long fileSize) {
         this.tableSize = tableSize;
         this.frameLimit = frameLimit;
+        this.fileSize = fileSize;
         computeColumnSet(gbyList);
     }
 
@@ -140,8 +139,8 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-            pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(
-                    columnSet), null), null);
+            pv[0] = new StructuralPropertiesVector(
+                    new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnSet), null), null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
@@ -151,7 +150,7 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         List<LogicalVariable> gbyCols = getGbyColumns();
         int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
         GroupByOperator gby = (GroupByOperator) op;
@@ -194,8 +193,8 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
             AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
             aff[i++] = expressionRuntimeProvider.createSerializableAggregateFunctionFactory(aggFun, aggOpInputEnv,
                     inputSchemas, context);
-            intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
-                    context.getMetadataProvider()));
+            intermediateTypes
+                    .add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv, context.getMetadataProvider()));
         }
 
         int[] keyAndDecFields = new int[keys.length + fdColumns.length];
@@ -219,9 +218,10 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(gbyCols,
                 aggOpInputEnv, context);
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
-        IBinaryHashFunctionFactory[] hashFunctionFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
-                gbyCols, aggOpInputEnv, context);
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+        IBinaryHashFunctionFamily[] hashFunctionFactories = JobGenHelper.variablesToBinaryHashFunctionFamilies(gbyCols,
+                aggOpInputEnv, context);
 
         ISerializedAggregateEvaluatorFactory[] merges = new ISerializedAggregateEvaluatorFactory[n];
         List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
@@ -249,13 +249,11 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
         IAggregatorDescriptorFactory aggregatorFactory = new SerializableAggregatorDescriptorFactory(aff);
         IAggregatorDescriptorFactory mergeFactory = new SerializableAggregatorDescriptorFactory(merges);
 
-        ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
-        INormalizedKeyComputerFactory normalizedKeyFactory = JobGenHelper.variablesToAscNormalizedKeyComputerFactory(
-                gbyCols, aggOpInputEnv, context);
-        ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, keyAndDecFields,
-                frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
-                recordDescriptor, new HashSpillableTableFactory(tpcf, tableSize), false);
-
+        INormalizedKeyComputerFactory normalizedKeyFactory = JobGenHelper
+                .variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
+        ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+                keyAndDecFields, frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
+                recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
         contributeOpDesc(builder, gby, gbyOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 071ee72..a24b87b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -150,7 +150,7 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
                     case INNER: {
                         opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory);
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, false, null);
                         break;
                     }
                     case LEFT_OUTER: {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 73bba8f..8e12ece 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -166,7 +166,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                                     ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                             gby.getGroupByList(),
                                             physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                            physicalOptimizationConfig.getExternalGroupByTableSize());
+                                            physicalOptimizationConfig.getExternalGroupByTableSize(),
+                                            (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
+                                                    * physicalOptimizationConfig.getFrameSize());
                                     op.setPhysicalOperator(externalGby);
                                     break;
                                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 9f0960d..974a079 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -90,15 +90,13 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                // it only works if the output of the aggregator fits in one
-                // frame
                 for (int i = 0; i < pipelines.length; i++) {
                     pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
                 }
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index fe42878..6700fb8 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -113,12 +113,12 @@ public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatin
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor stateAccessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                byte[] data = accessor.getBuffer().array();
-                int startOffset = accessor.getTupleStartOffset(tIndex);
-                int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
-                int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+                byte[] data = stateAccessor.getBuffer().array();
+                int startOffset = stateAccessor.getTupleStartOffset(tIndex);
+                int aggFieldOffset = stateAccessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+                int refOffset = startOffset + stateAccessor.getFieldSlotsLength() + aggFieldOffset;
                 int start = refOffset;
                 for (int i = 0; i < aggs.length; i++) {
                     try {
@@ -133,12 +133,12 @@ public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatin
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor stateAccessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                byte[] data = accessor.getBuffer().array();
-                int startOffset = accessor.getTupleStartOffset(tIndex);
-                int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
-                int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+                byte[] data = stateAccessor.getBuffer().array();
+                int startOffset = stateAccessor.getTupleStartOffset(tIndex);
+                int aggFieldOffset = stateAccessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+                int refOffset = startOffset + stateAccessor.getFieldSlotsLength() + aggFieldOffset;
                 int start = refOffset;
                 for (int i = 0; i < aggs.length; i++) {
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 34b4865..13e3646 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -90,8 +90,8 @@ public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccum
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
                 IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
                 for (int i = 0; i < agg.length; i++) {
                     try {
@@ -123,7 +123,7 @@ public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccum
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
                     int tIndex, AggregateState state) throws HyracksDataException {
                 IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
                 for (int i = 0; i < agg.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index bca301f..df05d50 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -28,11 +28,12 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 import org.apache.hyracks.dataflow.std.sort.FrameSorterMergeSort;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
 
 public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -67,7 +68,8 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
                 writer.open();
                 if (frameSorter == null) {
                     IFrameBufferManager manager = new VariableFrameMemoryManager(
-                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), new FrameFreeSlotLastFit());
+                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
+                            FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
                     frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
                             comparatorFactories, outputRecordDesc);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 7fcab17..0dc1961 100644
--- a/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -46,7 +46,6 @@ import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.evaluators.TupleFieldEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.aggreg.AggregateRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import org.apache.hyracks.algebricks.runtime.operators.meta.SubplanRuntimeFactory;
@@ -67,15 +66,12 @@ import org.apache.hyracks.algebricks.tests.util.AlgebricksHyracksIntegrationUtil
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
@@ -85,7 +81,6 @@ import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -93,8 +88,6 @@ import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
@@ -483,75 +476,6 @@ public class PushRuntimeTest {
     }
 
     @Test
-    public void scanHashGbySelectWrite() throws Exception {
-        JobSpecification spec = new JobSpecification(FRAME_SIZE);
-
-        // the scanner
-        FileSplit[] fileSplits = new FileSplit[1];
-        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
-                "data/tpch0.001/customer.tbl")));
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
-        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
-                new UTF8StringSerializerDeserializer(), FloatSerializerDeserializer.INSTANCE,
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-                UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                UTF8StringParserFactory.INSTANCE };
-        FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
-                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
-                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
-
-        // the group-by
-        RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-        ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(new int[] { 3 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) });
-        IAggregateEvaluatorFactory[] aggFuns = new IAggregateEvaluatorFactory[] { new TupleCountAggregateFunctionFactory() };
-        IAggregatorDescriptorFactory aggFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFuns,
-                new int[] { 3 });
-        HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 3 }, tpcf,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
-                aggFactory, gbyDesc, 1024);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
-                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
-
-        // the algebricks op.
-        IScalarEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
-                new TupleFieldEvaluatorFactory(0)); // Canadian customers
-        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
-                BinaryBooleanInspectorImpl.FACTORY, false, -1, null);
-        RecordDescriptor selectDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
-
-        String filePath = PATH_ACTUAL + SEPARATOR + "scanHashGbySelectWrite.out";
-        File outFile = new File(filePath);
-        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
-                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
-                selectDesc);
-
-        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
-                new IPushRuntimeFactory[] { select, writer }, new RecordDescriptor[] { selectDesc, null });
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
-                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, gby, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), gby, 0, algebricksOp, 0);
-        spec.addRoot(algebricksOp);
-
-        AlgebricksHyracksIntegrationUtil.runJob(spec);
-        StringBuilder buf = new StringBuilder();
-        readFileToString(outFile, buf);
-        Assert.assertEquals("9", buf.toString());
-        outFile.delete();
-    }
-
-    @Test
     public void etsUnnestRunningaggregateWrite() throws Exception {
         JobSpecification spec = new JobSpecification(FRAME_SIZE);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FixedSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FixedSizeFrame.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FixedSizeFrame.java
index 0afe0ae..e36a773 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FixedSizeFrame.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FixedSizeFrame.java
@@ -23,11 +23,19 @@ import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FixedSizeFrame implements IFrame{
+public class FixedSizeFrame implements IFrame {
 
-    private final ByteBuffer buffer;
+    private ByteBuffer buffer;
 
-    public FixedSizeFrame(ByteBuffer buffer){
+    public FixedSizeFrame() {
+
+    }
+
+    public FixedSizeFrame(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public void reset(ByteBuffer buffer) {
         this.buffer = buffer;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index e48e4d6..0fc24c7 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IFrameAppender {
+
     /**
      * Reset to attach to a new frame.
      *

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksFrameMgrContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksFrameMgrContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksFrameMgrContext.java
index c945f8e..d43680a 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksFrameMgrContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksFrameMgrContext.java
@@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IHyracksFrameMgrContext {
+
     int getInitialFrameSize();
 
-    //TODO tobedeleted
     ByteBuffer allocateFrame() throws HyracksDataException;
 
     ByteBuffer allocateFrame(int bytes) throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
new file mode 100644
index 0000000..60c6e6d
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.comm.io;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FixedSizeFrameTupleAppender extends FrameTupleAppender {
+    @Override
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength)) {
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index 70d4f80..bab5463 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -135,13 +135,18 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
     protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
         sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
         for (int j = 0; j < getFieldCount(); ++j) {
+            sb.append(" ");
+            if (j > 0) {
+                sb.append("|");
+            }
             sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
             sb.append("{");
             bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
             try {
                 sb.append(recordDescriptor.getFields()[j].deserialize(dis));
-            } catch (HyracksDataException e) {
+            } catch (Exception e) {
                 e.printStackTrace();
+                sb.append("Failed to deserialize field" + j);
             }
             sb.append("}");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTuplePairComparator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTuplePairComparator.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTuplePairComparator.java
deleted file mode 100644
index 602e018..0000000
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTuplePairComparator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.comm.io;
-
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FrameTuplePairComparator {
-    private final int[] keys0;
-    private final int[] keys1;
-    private final IBinaryComparator[] comparators;
-
-    public FrameTuplePairComparator(int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.comparators = comparators;
-    }
-
-    public int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
-            throws HyracksDataException {
-        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
-        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
-
-        int tStart1 = accessor1.getTupleStartOffset(tIndex1);
-        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
-        for (int i = 0; i < keys0.length; ++i) {
-            int fIdx0 = keys0[i];
-            int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
-            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
-            int fLen0 = fEnd0 - fStart0;
-
-            int fIdx1 = keys1[i];
-            int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
-            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
-            int fLen1 = fEnd1 - fStart1;
-
-            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java
new file mode 100644
index 0000000..0967448
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.io;
+
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class GeneratedRunFileReader extends RunFileReader {
+    private int maxFrameSize;
+
+    GeneratedRunFileReader(FileReference file, IIOManager ioManager, long size, boolean deleteAfterRead,
+            int maxFrameSize) {
+        super(file, ioManager, size, deleteAfterRead);
+        this.maxFrameSize = maxFrameSize;
+    }
+
+    public void updateSize(int newMaxSize) {
+        this.maxFrameSize = newMaxSize;
+    }
+
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index b192be4..fb160f0 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -54,6 +54,7 @@ public class RunFileReader implements IFrameReader {
             return false;
         }
         frame.reset();
+
         int readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
         if (readLength <= 0) {
             throw new HyracksDataException("Premature end of file");

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index a5888c1..8031422 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -33,6 +33,7 @@ public class RunFileWriter implements IFrameWriter {
 
     private IFileHandle handle;
     private long size;
+    private int maxOutputFrameSize;
 
     public RunFileWriter(FileReference file, IIOManager ioManager) {
         this.file = file;
@@ -45,6 +46,7 @@ public class RunFileWriter implements IFrameWriter {
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         size = 0;
         failed = false;
+        maxOutputFrameSize = 0;
     }
 
     @Override
@@ -55,7 +57,9 @@ public class RunFileWriter implements IFrameWriter {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        size += ioManager.syncWrite(handle, size, buffer);
+        int writen = ioManager.syncWrite(handle, size, buffer);
+        maxOutputFrameSize = Math.max(writen, maxOutputFrameSize);
+        size += writen;
     }
 
     @Override
@@ -73,18 +77,18 @@ public class RunFileWriter implements IFrameWriter {
         return size;
     }
 
-    public RunFileReader createReader() throws HyracksDataException {
+    public GeneratedRunFileReader createReader() throws HyracksDataException {
         if (failed) {
             throw new HyracksDataException("createReader() called on a failed RunFileWriter");
         }
-        return new RunFileReader(file, ioManager, size, false);
+        return new GeneratedRunFileReader(file, ioManager, size, false, maxOutputFrameSize);
     }
 
-    public RunFileReader createDeleteOnCloseReader() throws HyracksDataException {
+    public GeneratedRunFileReader createDeleteOnCloseReader() throws HyracksDataException {
         if (failed) {
             throw new HyracksDataException("createReader() called on a failed RunFileWriter");
         }
-        return new RunFileReader(file, ioManager, size, true);
+        return new GeneratedRunFileReader(file, ioManager, size, true, maxOutputFrameSize);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index fb81912..fb9c6fb 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -44,7 +44,7 @@ import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
+import org.apache.hyracks.dataflow.std.structures.RunAndMaxFrameSizePair;
 
 public class ShuffleFrameReader implements IFrameReader {
     private final IHyracksTaskContext ctx;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTuplePointerAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTuplePointerAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTuplePointerAccessor.java
new file mode 100644
index 0000000..3683354
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTuplePointerAccessor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public abstract class AbstractTuplePointerAccessor implements ITuplePointerAccessor {
+
+    protected int tid = -1;
+
+    abstract IFrameTupleAccessor getInnerAccessor();
+
+    abstract void resetInnerAccessor(TuplePointer tuplePointer);
+
+    @Override
+    public void reset(TuplePointer tuplePointer) {
+        resetInnerAccessor(tuplePointer);
+        tid = tuplePointer.tupleIndex;
+    }
+
+    @Override
+    public int getTupleStartOffset() {
+        return getTupleStartOffset(tid);
+    }
+
+    @Override
+    public int getTupleLength() {
+        return getTupleLength(tid);
+    }
+
+    @Override
+    public int getAbsFieldStartOffset(int fieldId) {
+        return getAbsoluteFieldStartOffset(tid, fieldId);
+    }
+
+    @Override
+    public int getFieldLength(int fieldId) {
+        return getFieldLength(tid, fieldId);
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return getInnerAccessor().getBuffer();
+    }
+
+    @Override
+    public int getFieldCount() {
+        return getInnerAccessor().getFieldCount();
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return getInnerAccessor().getFieldSlotsLength();
+    }
+
+    protected void checkTupleIndex(int tupleIndex) {
+        if (tupleIndex != tid) {
+            throw new IllegalArgumentException(
+                    "ITupleBufferAccessor can not access the different tid other than the one given in reset function");
+        }
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        checkTupleIndex(tupleIndex);
+        return getInnerAccessor().getFieldEndOffset(tid, fIdx);
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        checkTupleIndex(tupleIndex);
+        return getInnerAccessor().getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        checkTupleIndex(tupleIndex);
+        return getInnerAccessor().getFieldLength(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleLength(int tupleIndex) {
+        checkTupleIndex(tupleIndex);
+        return getInnerAccessor().getTupleLength(tupleIndex);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        checkTupleIndex(tupleIndex);
+        return getInnerAccessor().getTupleEndOffset(tupleIndex);
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        checkTupleIndex(tupleIndex);
+        return getInnerAccessor().getTupleStartOffset(tupleIndex);
+    }
+
+    @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        checkTupleIndex(tupleIndex);
+        return getInnerAccessor().getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleCount() {
+        return getInnerAccessor().getTupleCount();
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        throw new IllegalAccessError("Should never call this reset");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/BufferInfo.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/BufferInfo.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/BufferInfo.java
new file mode 100644
index 0000000..d5fbdc0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/BufferInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.dataflow.std.structures.IResetable;
+
+public class BufferInfo implements IResetable<BufferInfo> {
+    private ByteBuffer buffer;
+    private int startOffset;
+    private int length;
+
+    public BufferInfo(ByteBuffer buffer, int startOffset, int length) {
+        reset(buffer, startOffset, length);
+    }
+
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    public int getStartOffset() {
+        return startOffset;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    @Override
+    public void reset(BufferInfo other) {
+        this.buffer = other.buffer;
+        this.startOffset = other.startOffset;
+        this.length = other.length;
+    }
+
+    public void reset(ByteBuffer buffer, int startOffset, int length) {
+        this.buffer = buffer;
+        this.startOffset = startOffset;
+        this.length = length;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
new file mode 100644
index 0000000..626edba
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeallocatableFramePool implements IDeallocatableFramePool {
+
+    private final IHyracksFrameMgrContext ctx;
+    private final int memBudget;
+    private int allocated;
+    private LinkedList<ByteBuffer> buffers;
+
+    public DeallocatableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) {
+        this.ctx = ctx;
+        this.memBudget = memBudgetInBytes;
+        this.allocated = 0;
+        this.buffers = new LinkedList<>();
+    }
+
+    @Override
+    public int getMinFrameSize() {
+        return ctx.getInitialFrameSize();
+    }
+
+    @Override
+    public int getMemoryBudgetBytes() {
+        return memBudget;
+    }
+
+    @Override
+    public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException {
+        ByteBuffer buffer = findExistingFrame(frameSize);
+        if (buffer != null) {
+            return buffer;
+        }
+        if (haveEnoughFreeSpace(frameSize)) {
+            return createNewFrame(frameSize);
+        }
+        return mergeExistingFrames(frameSize);
+    }
+
+    private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException {
+        int mergedSize = memBudget - allocated;
+        for (Iterator<ByteBuffer> iter = buffers.iterator(); iter.hasNext(); ) {
+            ByteBuffer buffer = iter.next();
+            iter.remove();
+            mergedSize += buffer.capacity();
+            ctx.deallocateFrames(buffer.capacity());
+            allocated -= buffer.capacity();
+            if (mergedSize >= frameSize) {
+                return createNewFrame(mergedSize);
+            }
+        }
+        return null;
+
+    }
+
+    private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException {
+        allocated += frameSize;
+        return ctx.allocateFrame(frameSize);
+    }
+
+    private boolean haveEnoughFreeSpace(int frameSize) {
+        return allocated + frameSize <= memBudget;
+    }
+
+    private ByteBuffer findExistingFrame(int frameSize) {
+        for (Iterator<ByteBuffer> iter = buffers.iterator(); iter.hasNext(); ) {
+            ByteBuffer next = iter.next();
+            if (next.capacity() >= frameSize) {
+                iter.remove();
+                return next;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void deAllocateBuffer(ByteBuffer buffer) {
+        if (buffer.capacity() != ctx.getInitialFrameSize()) {
+            // simply deallocate the Big Object frame
+            ctx.deallocateFrames(buffer.capacity());
+            allocated -= buffer.capacity();
+        } else {
+            buffers.add(buffer);
+        }
+    }
+
+    @Override
+    public void reset() {
+        allocated = 0;
+        buffers.clear();
+    }
+
+    @Override
+    public void close() {
+        buffers.clear();
+        allocated = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/EnumFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/EnumFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/EnumFreeSlotPolicy.java
new file mode 100644
index 0000000..15bbab2
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/EnumFreeSlotPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+/**
+ * Under the big-object setting, there could be multiple variable size free slots to use.
+ * In that case, we need to decide which free slot to give back to caller.
+ */
+public enum EnumFreeSlotPolicy {
+    /**
+     * Choose the minimum size frame
+     */
+    SMALLEST_FIT,
+    /**
+     * Choose the latest used frame if it is big enough
+     */
+    LAST_FIT,
+    /**
+     * Choose the largest size frame
+     */
+    BIGGEST_FIT,
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java
new file mode 100644
index 0000000..6f5587f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.dataflow.std.structures.IResetableComparable;
+import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory;
+import org.apache.hyracks.dataflow.std.structures.MaxHeap;
+
+class FrameFreeSlotBiggestFirst implements IFrameFreeSlotPolicy {
+    private static final int INVALID = -1;
+    private static final int INITIAL_FRAME_NUM = 10;
+
+    protected class SpaceEntryFactory implements IResetableComparableFactory {
+        @Override
+        public IResetableComparable createResetableComparable() {
+            return new SpaceEntry();
+        }
+    }
+
+    protected class SpaceEntry implements IResetableComparable<SpaceEntry> {
+        private int space;
+        private int id;
+
+        SpaceEntry() {
+            space = INVALID;
+            id = INVALID;
+        }
+
+        @Override
+        public int compareTo(SpaceEntry o) {
+            if (o.space != space) {
+                if (o.space == INVALID) {
+                    return 1;
+                }
+                if (space == INVALID) {
+                    return -1;
+                }
+                return space < o.space ? -1 : 1;
+            }
+            return 0;
+        }
+
+        @Override
+        public void reset(SpaceEntry other) {
+            space = other.space;
+            id = other.id;
+        }
+
+        void reset(int space, int id) {
+            this.space = space;
+            this.id = id;
+        }
+    }
+
+    private MaxHeap heap;
+    private SpaceEntry tempEntry;
+
+    public FrameFreeSlotBiggestFirst(int initialCapacity) {
+        heap = new MaxHeap(new SpaceEntryFactory(), initialCapacity);
+        tempEntry = new SpaceEntry();
+    }
+
+    public FrameFreeSlotBiggestFirst() {
+        this(INITIAL_FRAME_NUM);
+    }
+
+    @Override
+    public int popBestFit(int tobeInsertedSize) {
+        if (!heap.isEmpty()) {
+            heap.peekMax(tempEntry);
+            if (tempEntry.space >= tobeInsertedSize) {
+                heap.getMax(tempEntry);
+                return tempEntry.id;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public void pushNewFrame(int frameID, int freeSpace) {
+        tempEntry.reset(freeSpace, frameID);
+        heap.insert(tempEntry);
+    }
+
+    @Override
+    public void reset() {
+        heap.reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java
new file mode 100644
index 0000000..819ff80
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.util.Arrays;
+
+class FrameFreeSlotLastFit implements IFrameFreeSlotPolicy {
+    private static int INITIAL_CAPACITY = 10;
+
+    private class FrameSpace {
+        private int frameId;
+        private int freeSpace;
+
+        FrameSpace(int frameId, int freeSpace) {
+            reset(frameId, freeSpace);
+        }
+
+        void reset(int frameId, int freeSpace) {
+            this.frameId = frameId;
+            this.freeSpace = freeSpace;
+        }
+    }
+
+    private FrameSpace[] frameSpaces;
+    private int size;
+
+    public FrameFreeSlotLastFit(int initialFrameNumber) {
+        frameSpaces = new FrameSpace[initialFrameNumber];
+        size = 0;
+    }
+
+    public FrameFreeSlotLastFit() {
+        this(INITIAL_CAPACITY);
+    }
+
+    @Override
+    public int popBestFit(int tobeInsertedSize) {
+        for (int i = size - 1; i >= 0; i--) {
+            if (frameSpaces[i].freeSpace >= tobeInsertedSize) {
+                FrameSpace ret = frameSpaces[i];
+                System.arraycopy(frameSpaces, i + 1, frameSpaces, i, size - i - 1);
+                frameSpaces[--size] = ret;
+                return ret.frameId;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public void pushNewFrame(int frameID, int freeSpace) {
+        if (size >= frameSpaces.length) {
+            frameSpaces = Arrays.copyOf(frameSpaces, size * 2);
+        }
+        if (frameSpaces[size] == null) {
+            frameSpaces[size++] = new FrameSpace(frameID, freeSpace);
+        } else {
+            frameSpaces[size++].reset(frameID, freeSpace);
+        }
+    }
+
+    @Override
+    public void reset() {
+        size = 0;
+        for (int i = frameSpaces.length - 1; i >= 0; i--) {
+            frameSpaces[i] = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotPolicyFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotPolicyFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotPolicyFactory.java
new file mode 100644
index 0000000..d4267ab
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotPolicyFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+public class FrameFreeSlotPolicyFactory {
+    public static IFrameFreeSlotPolicy createFreeSlotPolicy(EnumFreeSlotPolicy policy, int initialNumberFrames) {
+        switch (policy) {
+            case SMALLEST_FIT:
+                return new FrameFreeSlotSmallestFit();
+            case BIGGEST_FIT:
+                return new FrameFreeSlotBiggestFirst(initialNumberFrames);
+            case LAST_FIT:
+            default:
+                return new FrameFreeSlotLastFit(initialNumberFrames);
+        }
+    }
+
+    public static IFrameFreeSlotPolicy createFreeSlotPolicy(EnumFreeSlotPolicy policy) {
+        switch (policy) {
+            case SMALLEST_FIT:
+                return new FrameFreeSlotSmallestFit();
+            case BIGGEST_FIT:
+                return new FrameFreeSlotBiggestFirst();
+            case LAST_FIT:
+            default:
+                return new FrameFreeSlotLastFit();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java
new file mode 100644
index 0000000..ada6752
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
+
+class FrameFreeSlotSmallestFit implements IFrameFreeSlotPolicy {
+
+    private TreeMap<Integer, LinkedList<Integer>> freeSpaceIndex;
+
+    public FrameFreeSlotSmallestFit() {
+        freeSpaceIndex = new TreeMap<>();
+    }
+
+    @Override
+    public int popBestFit(int tobeInsertedSize) {
+        Map.Entry<Integer, LinkedList<Integer>> entry = freeSpaceIndex.ceilingEntry(tobeInsertedSize);
+        if (entry == null) {
+            return -1;
+        }
+        int id = entry.getValue().removeFirst();
+        if (entry.getValue().isEmpty()) {
+            freeSpaceIndex.remove(entry.getKey());
+        }
+        return id;
+    }
+
+    @Override
+    public void pushNewFrame(int frameID, int freeSpace) {
+        Map.Entry<Integer, LinkedList<Integer>> entry = freeSpaceIndex.ceilingEntry(freeSpace);
+        if (entry == null || entry.getKey() != freeSpace) {
+            LinkedList<Integer> linkedList = new LinkedList<>();
+            linkedList.add(frameID);
+            freeSpaceIndex.put(freeSpace, linkedList);
+        } else {
+            entry.getValue().add(frameID);
+        }
+    }
+
+    @Override
+    public void reset() {
+        freeSpaceIndex.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
new file mode 100644
index 0000000..39426c1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+public interface IDeallocatableFramePool extends IFramePool {
+
+    void deAllocateBuffer(ByteBuffer buffer);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeletableTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeletableTupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeletableTupleBufferManager.java
new file mode 100644
index 0000000..71d08b2
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeletableTupleBufferManager.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public interface IDeletableTupleBufferManager extends ITupleBufferManager {
+
+    void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException;
+}



Mime
View raw message