asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [14/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:31 GMT
VariableSizeFrame(VSizeFrame) support for Hyracks.

This patch replaced Frame/Accessor/Appender with the new API which
supports BigObject.
The ExternalSorter/TopKSorter/ExternalGroupSorter
have been implemented to support big object.

The Groupby && Join should work with BigObject also. But it will break the
memory budget when it encounter a big object. I will fix the memory
problem later in a separate CR.

The design about the frame allocation is
here:https://docs.google.com/presentation/d/15h9iQf5OYsgGZoQTbGHkj1yS2G9q2fd0s1lDAD1EJq0/edit?usp=sharing

Suggest review order:
Patch 12: It includes all of the sorting operators.
Patch 13: It applys the new IFrame API to all Hyracks codes.
Patch 14: Some bug fixes to pass all Asterix's tests.
Patch 15: Skip it!
Patch 16: Some bug fixes to the Asterix's tests in small frame setting.
Later Patch: address the comments

Change-Id: I2e08692078683f6f2cf17387e39037ad851fc05b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/234
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/0d87a57f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/0d87a57f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/0d87a57f

Branch: refs/heads/master
Commit: 0d87a57f7439eca69e6dae73f117747b4ea51746
Parents: 0e5d531
Author: JavierJia <jianfeng.jia@gmail.com>
Authored: Wed Jun 17 20:07:38 2015 -0700
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Wed Jun 17 20:45:14 2015 -0700

----------------------------------------------------------------------
 ...estedPlansAccumulatingAggregatorFactory.java |   6 +-
 .../NestedPlansRunningAggregatorFactory.java    |  27 +-
 ...eInputOneOutputOneFieldFramePushRuntime.java |  41 ++
 ...actOneInputOneOutputOneFramePushRuntime.java |  96 +--
 ...AbstractOneInputOneOutputRuntimeFactory.java |   2 +-
 .../MicroPreClusteredGroupRuntimeFactory.java   |  61 +-
 .../operators/meta/SubplanRuntimeFactory.java   |  14 +-
 .../sort/InMemorySortRuntimeFactory.java        |  13 +-
 .../std/EmptyTupleSourceRuntimeFactory.java     |  10 +-
 .../std/NestedTupleSourceRuntimeFactory.java    |   4 +-
 .../PartitioningSplitOperatorDescriptor.java    |  28 +-
 .../operators/std/SinkWriterRuntime.java        |   4 +-
 .../std/StreamSelectRuntimeFactory.java         |  49 +-
 .../ics/hyracks/api/comm/FixedSizeFrame.java    |  59 ++
 .../ics/hyracks/api/comm/FrameConstants.java    |  31 +-
 .../uci/ics/hyracks/api/comm/FrameHelper.java   |  61 +-
 .../edu/uci/ics/hyracks/api/comm/IFrame.java    |  60 ++
 .../ics/hyracks/api/comm/IFrameAppender.java    |  55 ++
 .../hyracks/api/comm/IFrameFieldAppender.java   |  47 ++
 .../uci/ics/hyracks/api/comm/IFrameReader.java  |   8 +-
 .../hyracks/api/comm/IFrameTupleAccessor.java   |  25 +-
 .../hyracks/api/comm/IFrameTupleAppender.java   |  39 +
 .../uci/ics/hyracks/api/comm/IFrameWriter.java  |  11 +-
 .../hyracks/api/comm/NoShrinkVSizeFrame.java    |  31 +
 .../uci/ics/hyracks/api/comm/VSizeFrame.java    |  76 ++
 .../api/context/IHyracksCommonContext.java      |  10 +-
 .../api/context/IHyracksFrameMgrContext.java    |  39 +
 .../api/dataset/IHyracksDatasetReader.java      |   5 +-
 .../client/dataset/DatasetClientContext.java    |  21 +-
 .../client/dataset/HyracksDatasetReader.java    |  27 +-
 .../comm/channels/NetworkInputChannel.java      |   2 +-
 .../comm/channels/NetworkOutputChannel.java     |  52 +-
 .../comm/channels/ReadBufferFactory.java        |   2 +-
 .../edu/uci/ics/hyracks/control/nc/Joblet.java  |  35 +-
 .../edu/uci/ics/hyracks/control/nc/Task.java    |  24 +-
 .../nc/dataset/DatasetPartitionWriter.java      |   2 +-
 .../hyracks/control/nc/dataset/ResultState.java |   3 +
 .../ics/hyracks/control/nc/io/IOManager.java    |  11 +-
 .../MaterializedPartitionInputChannel.java      |   3 +
 .../control/nc/partitions/PartitionManager.java |   4 +-
 .../ConnectorReceiverProfilingFrameReader.java  |   7 +-
 .../nc/resources/memory/FrameManager.java       |  85 +++
 hyracks/hyracks-dataflow-common/pom.xml         |   6 +
 .../common/comm/io/AbstractFrameAppender.java   |  99 +++
 .../common/comm/io/ArrayTupleBuilder.java       |   1 +
 .../dataflow/common/comm/io/FrameConstants.java |  23 -
 .../common/comm/io/FrameDeserializer.java       |   5 +-
 .../comm/io/FrameDeserializingDataReader.java   |  19 +-
 .../comm/io/FrameDeserializingDataWriter.java   |   2 +-
 .../common/comm/io/FrameFixedFieldAppender.java | 108 +++
 .../comm/io/FrameFixedFieldTupleAppender.java   | 130 ++++
 .../common/comm/io/FrameOutputStream.java       |  24 +-
 .../common/comm/io/FrameTupleAccessor.java      | 101 ++-
 .../common/comm/io/FrameTupleAppender.java      | 237 +++---
 .../comm/io/FrameTupleAppenderAccessor.java     | 131 ++++
 .../comm/io/FrameTupleAppenderWrapper.java      |  80 +--
 .../comm/io/ResultFrameTupleAccessor.java       |  72 +-
 .../common/comm/io/SerializingDataWriter.java   |  32 +-
 .../dataflow/common/comm/util/FrameUtils.java   | 260 ++++++-
 .../dataflow/common/io/RunFileReader.java       |  28 +-
 .../dataflow/common/util/IntSerDeUtils.java     |   6 +
 .../FrameFixedFieldTupleAppenderTest.java       | 215 ++++++
 .../hadoop/HadoopReadOperatorDescriptor.java    |  21 +-
 .../dataflow/hadoop/mapreduce/HadoopHelper.java |   2 +-
 .../dataflow/hadoop/mapreduce/KVIterator.java   |  20 +-
 .../mapreduce/MapperOperatorDescriptor.java     |  50 +-
 .../dataflow/hadoop/mapreduce/ReduceWriter.java |  32 +-
 .../hadoop/mapreduce/ShuffleFrameReader.java    |  83 ++-
 hyracks/hyracks-dataflow-std/pom.xml            |   6 +
 .../std/collectors/InputChannelFrameReader.java |  69 +-
 .../NonDeterministicChannelReader.java          |  27 +-
 .../collectors/NonDeterministicFrameReader.java |  30 +-
 .../std/collectors/SortMergeFrameReader.java    |  17 +-
 .../LocalityAwarePartitionDataWriter.java       |  34 +-
 .../std/connectors/PartitionDataWriter.java     |  42 +-
 .../file/DelimitedDataTupleParserFactory.java   |  26 +-
 .../file/PlainFileWriterOperatorDescriptor.java |   9 +-
 .../std/group/HashSpillableTableFactory.java    |  63 +-
 .../dataflow/std/group/ISpillableTable.java     |   4 +-
 .../ExternalGroupBuildOperatorNodePushable.java |   2 +-
 .../ExternalGroupMergeOperatorNodePushable.java |  63 +-
 .../std/group/hash/GroupingHashTable.java       |  45 +-
 .../HashGroupBuildOperatorNodePushable.java     |   2 +-
 .../PreclusteredGroupOperatorNodePushable.java  |   8 -
 .../preclustered/PreclusteredGroupWriter.java   |  29 +-
 .../sort/ExternalSortGroupByRunGenerator.java   |  98 +--
 .../sort/ExternalSortGroupByRunMerger.java      | 144 +---
 .../sort/SortGroupByOperatorDescriptor.java     | 255 ++-----
 .../join/GraceHashJoinOperatorNodePushable.java |  20 +-
 ...hJoinPartitionBuildOperatorNodePushable.java |  20 +-
 .../join/HybridHashJoinOperatorDescriptor.java  | 106 +--
 .../dataflow/std/join/InMemoryHashJoin.java     |  52 +-
 .../InMemoryHashJoinOperatorDescriptor.java     |  26 +-
 .../dataflow/std/join/NestedLoopJoin.java       |  96 +--
 .../join/NestedLoopJoinOperatorDescriptor.java  |  22 +-
 .../std/join/OptimizedHybridHashJoin.java       | 122 ++--
 ...timizedHybridHashJoinOperatorDescriptor.java | 130 ++--
 ...ConstantTupleSourceOperatorNodePushable.java |  10 +-
 .../std/misc/LimitOperatorDescriptor.java       |  12 +-
 .../std/misc/MaterializerTaskState.java         |   8 +-
 .../misc/MaterializingOperatorDescriptor.java   |   9 +-
 .../std/misc/SplitOperatorDescriptor.java       |   4 +-
 .../result/ResultWriterOperatorDescriptor.java  |  19 +-
 .../dataflow/std/sort/AbstractFrameSorter.java  | 186 +++++
 .../std/sort/AbstractSortRunGenerator.java      |  77 ++
 .../sort/AbstractSorterOperatorDescriptor.java  | 197 +++++
 .../hyracks/dataflow/std/sort/BSTMemMgr.java    | 717 -------------------
 .../hyracks/dataflow/std/sort/BSTNodeUtil.java  | 233 ------
 .../sort/ExternalSortOperatorDescriptor.java    | 211 ++----
 .../std/sort/ExternalSortRunGenerator.java      | 127 ++--
 .../std/sort/ExternalSortRunMerger.java         | 435 +++++------
 .../dataflow/std/sort/FrameSorterMergeSort.java | 172 +----
 .../dataflow/std/sort/FrameSorterQuickSort.java | 157 +---
 .../dataflow/std/sort/HeapSortRunGenerator.java |  99 +++
 .../std/sort/HybridTopKSortRunGenerator.java    | 109 +++
 .../hyracks/dataflow/std/sort/IFrameSorter.java |  31 +-
 .../dataflow/std/sort/IMemoryManager.java       |  88 ---
 .../dataflow/std/sort/IRunGenerator.java        |   3 +-
 .../dataflow/std/sort/ISelectionTree.java       |  90 ---
 .../ics/hyracks/dataflow/std/sort/ISorter.java  |  33 +
 .../hyracks/dataflow/std/sort/ITupleSorter.java |  26 +
 .../sort/InMemorySortOperatorDescriptor.java    |  27 +-
 ...OptimizedExternalSortOperatorDescriptor.java | 218 ------
 .../sort/OptimizedExternalSortRunGenerator.java | 283 --------
 ...imizedExternalSortRunGeneratorWithLimit.java | 436 -----------
 .../std/sort/RunAndMaxFrameSizePair.java        |  32 +
 .../std/sort/RunMergingFrameReader.java         | 115 +--
 .../uci/ics/hyracks/dataflow/std/sort/Slot.java |  81 ---
 .../hyracks/dataflow/std/sort/SortMinHeap.java  | 293 --------
 .../dataflow/std/sort/SortMinMaxHeap.java       | 448 ------------
 .../std/sort/TopKSorterOperatorDescriptor.java  |  62 ++
 .../dataflow/std/sort/TupleSorterHeapSort.java  | 269 +++++++
 .../sort/buffermanager/EnumFreeSlotPolicy.java  |  22 +
 .../FrameFreeSlotBiggestFirst.java              |  97 +++
 .../buffermanager/FrameFreeSlotLastFit.java     |  81 +++
 .../buffermanager/FrameFreeSlotSmallestFit.java |  59 ++
 .../sort/buffermanager/IFrameBufferManager.java |  68 ++
 .../buffermanager/IFrameFreeSlotPolicy.java     |  44 ++
 .../std/sort/buffermanager/IFramePool.java      |  48 ++
 .../buffermanager/ITupleBufferAccessor.java     |  36 +
 .../sort/buffermanager/ITupleBufferManager.java |  42 ++
 .../VariableFrameMemoryManager.java             | 132 ++++
 .../sort/buffermanager/VariableFramePool.java   | 200 ++++++
 .../VariableTupleMemoryManager.java             | 203 ++++++
 .../sort/util/DeletableFrameTupleAppender.java  | 244 +++++++
 .../std/sort/util/GroupFrameAccessor.java       | 170 +++++
 .../dataflow/std/sort/util/GroupVSizeFrame.java |  46 ++
 .../IAppendDeletableFrameTupleAccessor.java     |  72 ++
 .../dataflow/std/structures/AbstractHeap.java   | 156 ++++
 .../hyracks/dataflow/std/structures/IHeap.java  |  44 ++
 .../dataflow/std/structures/IMaxHeap.java       |  43 ++
 .../dataflow/std/structures/IMinHeap.java       |  42 ++
 .../dataflow/std/structures/IMinMaxHeap.java    |  18 +
 .../dataflow/std/structures/IResetable.java     |  20 +
 .../std/structures/IResetableComparable.java    |  19 +
 .../structures/IResetableComparableFactory.java |  20 +
 .../dataflow/std/structures/MaxHeap.java        |  63 ++
 .../dataflow/std/structures/MinHeap.java        |  62 ++
 .../dataflow/std/structures/MinMaxHeap.java     | 217 ++++++
 .../std/structures/SerializableHashTable.java   |   2 +-
 .../dataflow/std/structures/TuplePointer.java   |  43 +-
 .../util/DeserializedOperatorNodePushable.java  |   2 +-
 .../ics/hyracks/dataflow/std/util/MathUtil.java |  50 ++
 .../dataflow/std/util/ReferenceEntry.java       |  14 +-
 .../std/util/ReferencedPriorityQueue.java       |  31 +-
 .../ics/hyracks/dataflow/std/sort/Utility.java  |  23 +
 .../dataflow/std/sort/buffermanager/Common.java |  26 +
 .../FrameFreeSlotBestFitUsingTreeMapTest.java   |  60 ++
 .../FrameFreeSlotBiggestFirstTest.java          |  70 ++
 .../buffermanager/FrameFreeSlotLastFitTest.java |  86 +++
 .../buffermanager/VariableFramePoolTest.java    | 216 ++++++
 .../VariableFramesMemoryManagerTest.java        | 170 +++++
 .../VariableTupleMemoryManagerTest.java         | 230 ++++++
 .../util/DeletableFrameTupleAppenderTest.java   | 233 ++++++
 .../std/structures/AbstracHeapTest.java         |  86 +++
 .../dataflow/std/structures/MaxHeapTest.java    |  99 +++
 .../dataflow/std/structures/MinHeapTest.java    | 102 +++
 .../dataflow/std/structures/MinMaxHeapTest.java | 109 +++
 .../ics/hyracks/dataflow/std/util/MathTest.java |  40 ++
 .../btree/helper/DataGenOperatorDescriptor.java |  12 +-
 .../hyracks-integration-tests/.gitignore        |   3 +
 .../comm/SerializationDeserializationTest.java  |  20 +-
 .../integration/AbstractIntegrationTest.java    |  37 +-
 .../AbstractMultiNCIntegrationTest.java         |  21 +-
 .../integration/OptimizedSortMergeTest.java     |  31 +-
 .../integration/VSizeFrameSortMergeTest.java    | 118 +++
 .../tests/unit/AbstractRunGeneratorTest.java    | 279 ++++++++
 .../unit/ExternalSortRunGeneratorTest.java      |  32 +
 .../tests/unit/HeapSortRunGeneratorTest.java    |  37 +
 .../tests/unit/HybridSortRunGenerator.java      |  30 +
 .../tests/unit/RunMergingFrameReaderTest.java   | 409 +++++++++++
 .../tests/unit/TopKRunGeneratorTest.java        | 208 ++++++
 .../examples/text/WordTupleParserFactory.java   |  19 +-
 .../tpch-example/tpchclient/pom.xml             |   2 +-
 .../hyracks/examples/tpch/client/Common.java    |  83 +++
 .../ics/hyracks/examples/tpch/client/Join.java  | 320 +++++++++
 .../ics/hyracks/examples/tpch/client/Main.java  | 362 ----------
 .../ics/hyracks/examples/tpch/client/Sort.java  | 165 +++++
 .../dataflow/HDFSWriteOperatorDescriptor.java   |   2 +-
 .../hdfs/lib/TextKeyValueParserFactory.java     |  18 +-
 .../dataflow/HDFSWriteOperatorDescriptor.java   |   2 +-
 .../BTreeUpdateSearchOperatorNodePushable.java  |  11 +-
 .../IndexBulkLoadOperatorNodePushable.java      |   2 +-
 ...xInsertUpdateDeleteOperatorNodePushable.java |  13 +-
 .../IndexSearchOperatorNodePushable.java        |  34 +-
 ...eIndexDiskOrderScanOperatorNodePushable.java |  22 +-
 .../TreeIndexStatsOperatorNodePushable.java     |  13 +-
 ...xInsertUpdateDeleteOperatorNodePushable.java |  27 +-
 .../BinaryTokenizerOperatorNodePushable.java    |  31 +-
 ...InvertedIndexSearchOperatorNodePushable.java |   2 +
 .../ondisk/FixedSizeFrameTupleAccessor.java     |  10 +
 .../ondisk/OnDiskInvertedIndex.java             |  12 +-
 .../search/AbstractTOccurrenceSearcher.java     |  26 +-
 .../search/PartitionedTOccurrenceSearcher.java  |   9 +-
 .../lsm/invertedindex/search/SearchResult.java  |   4 +-
 .../search/TOccurrenceSearcher.java             |   4 +-
 .../hyracks/test/support/TestJobletContext.java |  20 +-
 .../hyracks/test/support/TestTaskContext.java   |  21 +-
 .../storage/am/btree/BTreeStatsTest.java        |  11 +-
 .../storage/am/btree/FieldPrefixNSMTest.java    |  11 +-
 220 files changed, 10258 insertions(+), 6056 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index af4bff2..618768c 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -50,7 +50,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
 
-        final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
+        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length,
                 decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
@@ -173,7 +173,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
         private ArrayTupleBuilder tb;
         private AlgebricksPipeline[] subplans;
 
-        public AggregatorOutput(int frameSize, AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
+        public AggregatorOutput(AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
             this.subplans = subplans;
             // this.keyFieldIndexes = keyFieldIndexes;
             int totalAggFields = 0;
@@ -187,7 +187,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
 
             this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
             for (int i = 0; i < inputRecDesc.length; i++) {
-                tAccess[i] = new FrameTupleAccessor(frameSize, inputRecDesc[i]);
+                tAccess[i] = new FrameTupleAccessor(inputRecDesc[i]);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index c8dc852..b7e736e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -23,6 +23,7 @@ import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -67,10 +68,6 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
 
         final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
 
-        final ByteBuffer outputFrame = ctx.allocateFrame();
-        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputAppender.reset(outputFrame, true);
-
         return new IAggregatorDescriptor() {
 
             @Override
@@ -167,7 +164,6 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
         private final ArrayTupleBuilder gbyTb;
         private final AlgebricksPipeline[] subplans;
         private final IFrameWriter outputWriter;
-        private final ByteBuffer outputFrame;
         private final FrameTupleAppender outputAppender;
 
         public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
@@ -188,12 +184,10 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
 
             this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
             for (int i = 0; i < inputRecDesc.length; i++) {
-                tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
+                tAccess[i] = new FrameTupleAccessor(inputRecDesc[i]);
             }
 
-            this.outputFrame = ctx.allocateFrame();
-            this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-            this.outputAppender.reset(outputFrame, true);
+            this.outputAppender = new FrameTupleAppender(new VSizeFrame(ctx));
         }
 
         @Override
@@ -221,23 +215,14 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
                 for (int f = 0; f < w; f++) {
                     tb.addField(accessor, tIndex, f);
                 }
-                if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    FrameUtils.flushFrame(outputFrame, outputWriter);
-                    outputAppender.reset(outputFrame, true);
-                    if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new HyracksDataException(
-                                "Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
-                    }
-                }
+                FrameUtils.appendToWriter(outputWriter, outputAppender, tb.getFieldEndOffsets(),
+                        tb.getByteArray(), 0, tb.getSize());
             }
         }
 
         @Override
         public void close() throws HyracksDataException {
-            if (outputAppender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(outputFrame, outputWriter);
-                outputAppender.reset(outputFrame, true);
-            }
+            outputAppender.flush(outputWriter, true);
         }
 
         public void setInputIdx(int inputIdx) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java
new file mode 100644
index 0000000..44c1736
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameFieldAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractOneInputOneOutputOneFieldFramePushRuntime
+        extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+    @Override
+    protected IFrameTupleAppender getTupleAppender() {
+        return (FrameFixedFieldTupleAppender) appender;
+    }
+
+    protected IFrameFieldAppender getFieldAppender() {
+        return (FrameFixedFieldTupleAppender) appender;
+    }
+
+    protected final void initAccessAppendFieldRef(IHyracksTaskContext ctx) throws HyracksDataException {
+        frame = new VSizeFrame(ctx);
+        appender = new FrameFixedFieldTupleAppender(inputRecordDesc.getFieldCount());
+        appender.reset(frame, true);
+        tAccess = new FrameTupleAccessor(inputRecordDesc);
+        tRef = new FrameTupleReference();
+    }
+
+    protected void appendField(byte[] array, int start, int length) throws HyracksDataException {
+        FrameUtils.appendFieldToWriter(writer, getFieldAppender(), array, start, length);
+    }
+
+    protected void appendField(IFrameTupleAccessor accessor, int tid, int fid) throws HyracksDataException {
+        FrameUtils.appendFieldToWriter(writer, getFieldAppender(), accessor, tid, fid);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 81052d6..ec4e039 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -14,8 +14,11 @@
  */
 package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -26,42 +29,53 @@ import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends AbstractOneInputOneOutputPushRuntime {
 
-    protected FrameTupleAppender appender;
-    protected ByteBuffer frame;
+    protected IFrameAppender appender;
+    protected IFrame frame;
     protected FrameTupleAccessor tAccess;
     protected FrameTupleReference tRef;
 
+    protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
+        frame = new VSizeFrame(ctx);
+        appender = new FrameTupleAppender(frame);
+        tAccess = new FrameTupleAccessor(inputRecordDesc);
+    }
+
+    protected final void initAccessAppendRef(IHyracksTaskContext ctx) throws HyracksDataException {
+        initAccessAppend(ctx);
+        tRef = new FrameTupleReference();
+    }
+
     @Override
     public void close() throws HyracksDataException {
         flushIfNotFailed();
         writer.close();
-        appender.reset(frame, true);
+    }
+
+    protected void flushAndReset() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            appender.flush(writer, true);
+        }
     }
 
     protected void flushIfNotFailed() throws HyracksDataException {
         if (!failed) {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(frame, writer);
-            }
+            flushAndReset();
         }
     }
 
+    protected IFrameTupleAppender getTupleAppender() {
+        return (FrameTupleAppender) appender;
+    }
+
     protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
         appendToFrameFromTupleBuilder(tb, false);
     }
 
     protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb, boolean flushFrame) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            FrameUtils.flushFrame(frame, writer);
-            appender.reset(frame, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new HyracksDataException(
-                        "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
-            }
-        }
+        FrameUtils.appendToWriter(writer, getTupleAppender(), tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                tb.getSize());
         if (flushFrame) {
-            FrameUtils.flushFrame(frame, writer);
-            appender.reset(frame, true);
+            flushAndReset();
         }
     }
 
@@ -71,52 +85,18 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
 
     protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
             throws HyracksDataException {
-        if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
-            FrameUtils.flushFrame(frame, writer);
-            appender.reset(frame, true);
-            if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
-                throw new IllegalStateException(
-                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
-            }
-            return;
-        }
+        FrameUtils.appendProjectionToWriter(writer, getTupleAppender(), tAccess, tIndex, projectionList);
         if (flushFrame) {
-            FrameUtils.flushFrame(frame, writer);
-            appender.reset(frame, true);
+            flushAndReset();
         }
     }
 
     protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
-        if (!appender.append(tAccess, tIndex)) {
-            FrameUtils.flushFrame(frame, writer);
-            appender.reset(frame, true);
-            if (!appender.append(tAccess, tIndex)) {
-                throw new IllegalStateException(
-                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendTupleToFrame).");
-            }
-        }
-    }
-
-    protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
-        // if (allocFrame) {
-        frame = ctx.allocateFrame();
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        appender.reset(frame, true);
-        // }
-        tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
-    }
-
-    protected final void initAccessAppendRef(IHyracksTaskContext ctx) throws HyracksDataException {
-        initAccessAppend(ctx);
-        tRef = new FrameTupleReference();
+        FrameUtils.appendToWriter(writer, getTupleAppender(), tAccess, tIndex);
     }
 
-    protected final void initAccessAppendFieldRef(IHyracksTaskContext ctx) throws HyracksDataException {
-        frame = ctx.allocateFrame();
-        appender = new FrameTupleAppender(ctx.getFrameSize(), inputRecordDesc.getFieldCount());
-        appender.reset(frame, true);
-        tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
-        tRef = new FrameTupleReference();
+    protected void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, tIndex0, accessor1, tIndex1);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 43270b6..1d31916 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -24,7 +24,7 @@ public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRu
 
     private static final long serialVersionUID = 1L;
 
-    protected int[] projectionList;
+    protected final int[] projectionList;
 
     public AbstractOneInputOneOutputRuntimeFactory(int[] projectionList) {
         this.projectionList = projectionList;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 2e1171c..3380d08 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -26,7 +26,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
@@ -57,47 +56,37 @@ public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOut
     @Override
     public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws AlgebricksException {
-        try {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-            final ByteBuffer copyFrame = ctx.allocateFrame();
-            final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-            copyFrameAccessor.reset(copyFrame);
-            ByteBuffer outFrame = ctx.allocateFrame();
-            final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-            appender.reset(outFrame, true);
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
 
-            return new AbstractOneInputOneOutputPushRuntime() {
+        return new AbstractOneInputOneOutputPushRuntime() {
 
-                private PreclusteredGroupWriter pgw;
+            private PreclusteredGroupWriter pgw;
 
-                @Override
-                public void open() throws HyracksDataException {
-                    pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
-                            outRecordDesc, writer);
-                    pgw.open();
-                }
+            @Override
+            public void open() throws HyracksDataException {
+                pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
+                        outRecordDesc, writer);
+                pgw.open();
+            }
 
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    pgw.nextFrame(buffer);
-                }
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                pgw.nextFrame(buffer);
+            }
 
-                @Override
-                public void fail() throws HyracksDataException {
-                    pgw.fail();
-                }
+            @Override
+            public void fail() throws HyracksDataException {
+                pgw.fail();
+            }
 
-                @Override
-                public void close() throws HyracksDataException {
-                    pgw.close();
-                }
-            };
-        } catch (HyracksDataException e) {
-            throw new AlgebricksException(e);
-        }
+            @Override
+            public void close() throws HyracksDataException {
+                pgw.close();
+            }
+        };
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index e5bede2..cf40669 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -33,7 +33,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -87,9 +86,7 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
             class TupleOuterProduct implements IFrameWriter {
 
                 private boolean smthWasWritten = false;
-                private IHyracksTaskContext hCtx = ctx;
-                private int frameSize = hCtx.getFrameSize();
-                private FrameTupleAccessor ta = new FrameTupleAccessor(frameSize,
+                private FrameTupleAccessor ta = new FrameTupleAccessor(
                         pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
                 private ArrayTupleBuilder tb = new ArrayTupleBuilder(nullWriters.length);
 
@@ -103,14 +100,7 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
                     ta.reset(buffer);
                     int nTuple = ta.getTupleCount();
                     for (int t = 0; t < nTuple; t++) {
-                        if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
-                            FrameUtils.flushFrame(frame, writer);
-                            appender.reset(frame, true);
-                            if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
-                                throw new HyracksDataException(
-                                        "Could not write frame: subplan result is larger than the single-frame limit.");
-                            }
-                        }
+                        appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t);
                     }
                     smthWasWritten = true;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 54ed192..d3751f5 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -25,6 +25,10 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
 
 public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -57,7 +61,10 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
             @Override
             public void open() throws HyracksDataException {
                 if (frameSorter == null) {
-                    frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
+                    IFrameBufferManager manager = new VariableFrameMemoryManager(
+                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
+                            new FrameFreeSlotLastFit());
+                    frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
                             comparatorFactories, outputRecordDesc);
                 }
                 frameSorter.reset();
@@ -76,8 +83,8 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
 
             @Override
             public void close() throws HyracksDataException {
-                frameSorter.sortFrames();
-                frameSorter.flushFrames(writer);
+                frameSorter.sort();
+                frameSorter.flush(writer);
                 writer.close();
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 745fdf6..35fcafc 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -14,16 +14,14 @@
  */
 package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
 
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
 
@@ -41,18 +39,16 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
     public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException {
         return new AbstractOneInputSourcePushRuntime() {
 
-            private ByteBuffer frame = ctx.allocateFrame();
             private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
-            private FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+            private FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
 
             @Override
             public void open() throws HyracksDataException {
                 writer.open();
-                appender.reset(frame, true);
                 if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                     throw new IllegalStateException();
                 }
-                FrameUtils.flushFrame(frame, writer);
+                appender.flush(writer, true);
                 writer.close();
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 55dbcbb..8df87ab 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,7 +21,6 @@ import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
 
@@ -67,8 +66,7 @@ public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
         }
 
         public void forceFlush() throws HyracksDataException {
-            FrameUtils.flushFrame(frame, writer);
-            appender.reset(frame, true);
+            appender.flush(writer, true);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index d19dd34..1d2d27c 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -22,7 +22,9 @@ import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -63,15 +65,15 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
             throws HyracksDataException {
         return new AbstractUnaryInputOperatorNodePushable() {
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
-            private final ByteBuffer[] writeBuffers = new ByteBuffer[outputArity];
+            private final IFrame[] writeBuffers = new IFrame[outputArity];
             private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
             private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
             private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
                     0);
-            private final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inOutRecDesc);
+            private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
             private final FrameTupleReference frameTuple = new FrameTupleReference();
 
-            private final FrameTupleAppender tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+            private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
             private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
             private final DataOutput tupleDos = tupleBuilder.getDataOutput();
 
@@ -80,9 +82,8 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
                 // Flush (possibly not full) buffers that have data, and close writers.
                 for (int i = 0; i < outputArity; i++) {
                     tupleAppender.reset(writeBuffers[i], false);
-                    if (tupleAppender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(writeBuffers[i], writers[i]);
-                    }
+                    // ? by JF why didn't clear the buffer ?
+                    tupleAppender.flush(writers[i], false);
                     writers[i].close();
                 }
             }
@@ -133,17 +134,8 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }
-                // Append to frame.
-                tupleAppender.reset(writeBuffers[outputIndex], false);
-                if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                        tupleBuilder.getSize())) {
-                    FrameUtils.flushFrame(writeBuffers[outputIndex], writers[outputIndex]);
-                    tupleAppender.reset(writeBuffers[outputIndex], true);
-                    if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                            tupleBuilder.getSize())) {
-                        throw new IllegalStateException();
-                    }
-                }
+                FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
+                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
             }
 
             @Override
@@ -153,7 +145,7 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
                 }
                 // Create write buffers.
                 for (int i = 0; i < outputArity; i++) {
-                    writeBuffers[i] = ctx.allocateFrame();
+                    writeBuffers[i] = new VSizeFrame(ctx);
                     // Make sure to clear all buffers, since we are reusing the tupleAppender.
                     tupleAppender.reset(writeBuffers[i], true);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index 37ad4a9..d26d090 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -41,7 +41,7 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
         this.ctx = ctx;
         this.printStream = printStream;
         this.inputRecordDesc = inputRecordDesc;
-        this.tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
     public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
@@ -54,7 +54,7 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
     public void open() throws HyracksDataException {
         if (first) {
             first = false;
-            tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+            tAccess = new FrameTupleAccessor(inputRecordDesc);
             try {
                 writer.init();
             } catch (AlgebricksException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 17603e7..4ed33f4 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -22,6 +22,7 @@ import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFieldFramePushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,26 +32,24 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private IScalarEvaluatorFactory cond;
+    private final IScalarEvaluatorFactory cond;
 
-    private IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+    private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
 
-    private boolean retainNull;
+    private final boolean retainNull;
 
-    private int nullPlaceholderVariableIndex;
+    private final int nullPlaceholderVariableIndex;
 
-    private INullWriterFactory nullWriterFactory;
+    private final INullWriterFactory nullWriterFactory;
 
     /**
      * @param cond
-     * @param projectionList
-     *            if projectionList is null, then no projection is performed
+     * @param projectionList               if projectionList is null, then no projection is performed
      * @param retainNull
      * @param nullPlaceholderVariableIndex
      * @param nullWriterFactory
@@ -75,7 +74,7 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
         final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+        return new AbstractOneInputOneOutputOneFieldFramePushRuntime() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
             private IScalarEvaluator eval;
             private INullWriter nullWriter = null;
@@ -122,35 +121,11 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
                         }
                     } else {
                         if (retainNull) {
-                            //keep all field values as is except setting nullPlaceholderVariable field to null
-                            int i = 0;
-                            int tryCount = 0;
-                            while (true) {
-                                for (i = 0; i < tRef.getFieldCount(); i++) {
-                                    if (i == nullPlaceholderVariableIndex) {
-                                        if (!appender.appendField(nullTupleBuilder.getByteArray(), 0,
-                                                nullTupleBuilder.getSize())) {
-                                            FrameUtils.flushFrame(frame, writer);
-                                            appender.reset(frame, true);
-                                            break;
-                                        }
-                                    } else {
-                                        if (!appender.appendField(tAccess, t, i)) {
-                                            FrameUtils.flushFrame(frame, writer);
-                                            appender.reset(frame, true);
-                                            break;
-                                        }
-                                    }
-                                }
-
-                                if (i == tRef.getFieldCount()) {
-                                    break;
+                            for (int i = 0; i < tRef.getFieldCount(); i++) {
+                                if (i == nullPlaceholderVariableIndex) {
+                                    appendField(nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize());
                                 } else {
-                                    tryCount++;
-                                    if (tryCount == 2) {
-                                        throw new IllegalStateException(
-                                                "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime).");
-                                    }
+                                    appendField(tAccess, t, i);
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java
new file mode 100644
index 0000000..ee92084
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FixedSizeFrame implements IFrame{
+
+    private final ByteBuffer buffer;
+
+    public FixedSizeFrame(ByteBuffer buffer){
+        this.buffer = buffer;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public void ensureFrameSize(int frameSize) throws HyracksDataException {
+        throw new HyracksDataException("FixedSizeFrame doesn't support capacity changes");
+    }
+
+    @Override
+    public void resize(int frameSize) throws HyracksDataException {
+        throw new HyracksDataException("FixedSizeFrame doesn't support capacity changes");
+    }
+
+    @Override
+    public int getFrameSize() {
+        return buffer.capacity();
+    }
+
+    @Override
+    public int getMinSize() {
+        return buffer.capacity() / FrameHelper.deserializeNumOfMinFrame(buffer, 0);
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        buffer.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
index 0dc97bc..176f23e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
@@ -15,9 +15,34 @@
 package edu.uci.ics.hyracks.api.comm;
 
 public interface FrameConstants {
-    public static final int SIZE_LEN = 4;
+    /**
+     * We use 4bytes to store the tupleCount at the end of the Frame.
+     */
+    int SIZE_LEN = 4;
 
-    public static final boolean DEBUG_FRAME_IO = false;
+    /**
+     * The offset of the frame_count which is one byte indicate how many initial_frames contained in current frame.
+     * The actual frameSize = frame_count * intitialFrameSize(given by user)
+     */
+    int META_DATA_FRAME_COUNT_OFFSET = 0;
+
+    /**
+     * The start offset of the tuple data. The first byte is used to store the frame_count
+     */
+    int TUPLE_START_OFFSET = 1;
+
+    /**
+     * Since we use one byte to store the frame_count, the max frame_count is 255.
+     */
+    int MAX_NUM_MINFRAME = 255;
+
+    /**
+     * Indicate the total size of the meta data.
+     */
+    int META_DATA_LEN = SIZE_LEN  + TUPLE_START_OFFSET;
+
+    boolean DEBUG_FRAME_IO = false;
+
+    int FRAME_FIELD_MAGIC = 0x12345678;
 
-    public static final int FRAME_FIELD_MAGIC = 0x12345678;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
index a6774c7..2376d2e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
@@ -14,8 +14,65 @@
  */
 package edu.uci.ics.hyracks.api.comm;
 
+import java.nio.ByteBuffer;
+
 public class FrameHelper {
     public static int getTupleCountOffset(int frameSize) {
-        return frameSize - 4;
+        return frameSize - FrameConstants.SIZE_LEN;
+    }
+
+    /**
+     * The actual frameSize = frameCount * intitialFrameSize
+     * This method is used to put that frameCount into the first byte of the frame buffer.
+     * @param outputFrame
+     * @param numberOfMinFrame
+     */
+    public static void serializeFrameSize(ByteBuffer outputFrame, byte numberOfMinFrame) {
+        serializeFrameSize(outputFrame, 0, numberOfMinFrame);
+    }
+
+    public static void serializeFrameSize(ByteBuffer outputFrame, int start, byte numberOfMinFrame) {
+        outputFrame.array()[start + FrameConstants.META_DATA_FRAME_COUNT_OFFSET] = (byte) (numberOfMinFrame & 0xff);
+    }
+
+    public static byte deserializeNumOfMinFrame(ByteBuffer frame) {
+        return deserializeNumOfMinFrame(frame, 0);
+    }
+
+    public static byte deserializeNumOfMinFrame(ByteBuffer buffer, int start) {
+        return (byte) (buffer.array()[start + FrameConstants.META_DATA_FRAME_COUNT_OFFSET] & 0xff);
+    }
+
+    /**
+     * Add one tuple requires
+     * 4bytes to store the tuple offset
+     * 4bytes * |fields| to store the relative offset of each field
+     * nbytes the actual data.
+     * If the tupleLength includes the field slot, please set the fieldCount = 0
+     */
+    public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+        return 4 + fieldCount * 4 + tupleLength;
+    }
+
+    /**
+     * A faster way of calculating the ceiling
+     *
+     * @param fieldCount   please set fieldCount to 0 if the tupleLength includes the fields' length
+     * @param tupleLength
+     * @param minFrameSize
+     * @return
+     */
+    public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) {
+        assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
+        return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+                * minFrameSize;
+    }
+
+    public static void clearRemainingFrame(ByteBuffer buffer, int position) {
+        buffer.array()[position] = 0;
+    }
+
+    public static boolean hasBeenCleared(ByteBuffer buffer, int position) {
+        return deserializeNumOfMinFrame(buffer, position) == 0;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java
new file mode 100644
index 0000000..ccbbb0d
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrame {
+
+    ByteBuffer getBuffer();
+
+    /**
+     * Make sure the frameSize is bigger or equal to the given size
+     *
+     * @param frameSize
+     * @throws HyracksDataException
+     */
+    void ensureFrameSize(int frameSize) throws HyracksDataException;
+
+    /**
+     *
+     * Expand of shrink the inner buffer to make the size exactly equal to {@code frameSize}
+     * @param frameSize
+     */
+    void resize(int frameSize) throws HyracksDataException;
+
+    /**
+     * Return the size of frame in bytes
+     *
+     * @return
+     */
+    int getFrameSize();
+
+    /**
+     * Return the minimum frame size which should read from the configuration file given by user
+     *
+     * @return
+     */
+    int getMinSize();
+
+    /**
+     * Reset the status of buffer, prepare to the next round of read/write
+     */
+    void reset() throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java
new file mode 100644
index 0000000..89b2bba
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameAppender {
+    /**
+     * Reset to attach to a new frame.
+     *
+     * @param frame the new frame
+     * @param clear indicate whether we need to clear this new frame
+     * @throws HyracksDataException
+     */
+    void reset(IFrame frame, boolean clear) throws HyracksDataException;
+
+    /**
+     * Get how many tuples in current frame.
+     *
+     * @return
+     */
+    int getTupleCount();
+
+    /**
+     * Get the ByteBuffer which contains the frame data.
+     *
+     * @return
+     */
+    ByteBuffer getBuffer();
+
+    /**
+     * Flush the frame content to the given writer.
+     * Clear the inner buffer after flush if {@code clear} is <code>true</code>.
+     *
+     * @param outWriter the output writer
+     * @param clear     indicate whether to clear the inside frame after flushed or not.
+     * @throws HyracksDataException
+     */
+    void flush(IFrameWriter outWriter, boolean clear) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java
new file mode 100644
index 0000000..f66248f
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The IFrameFieldAppender is used to append the data into frame field by field.
+ */
+public interface IFrameFieldAppender extends IFrameAppender {
+
+    /**
+     * Append the field stored in {@code bytes} into the current frame.
+     *
+     * @param bytes  the byte array that stores the field data
+     * @param offset the offset of the field data
+     * @param length the length of the field data
+     * @return true if the current frame has enough space to hold the field data, otherwise return false.
+     * @throws HyracksDataException
+     */
+    boolean appendField(byte[] bytes, int offset, int length) throws HyracksDataException;
+
+    /**
+     * Append the field of {@code fid} from the tuple {@code tid} whose information is stored in the {@code accessor}
+     * into the current frame.
+     *
+     * @param accessor tupleAccessor
+     * @param tid      tuple id in tupleAccessor
+     * @param fid      field id of the tuple {@code tid}
+     * @return true if the current frame has enough space to hold the field data, otherwise return false.
+     * @throws HyracksDataException
+     */
+    boolean appendField(IFrameTupleAccessor accessor, int tid, int fid) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
index c72782a..cd3c5ab 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
@@ -14,14 +14,12 @@
  */
 package edu.uci.ics.hyracks.api.comm;
 
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IFrameReader {
-    public void open() throws HyracksDataException;
+    void open() throws HyracksDataException;
 
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException;
+    boolean nextFrame(IFrame frame) throws HyracksDataException;
 
-    public void close() throws HyracksDataException;
+    void close() throws HyracksDataException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
index ee34add..130704f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
@@ -17,23 +17,28 @@ package edu.uci.ics.hyracks.api.comm;
 import java.nio.ByteBuffer;
 
 public interface IFrameTupleAccessor {
-    public int getFieldCount();
+    int getFieldCount();
 
-    public int getFieldSlotsLength();
+    int getFieldSlotsLength();
 
-    public int getFieldEndOffset(int tupleIndex, int fIdx);
+    int getFieldEndOffset(int tupleIndex, int fIdx);
 
-    public int getFieldStartOffset(int tupleIndex, int fIdx);
+    int getFieldStartOffset(int tupleIndex, int fIdx);
 
-    public int getFieldLength(int tupleIndex, int fIdx);
+    int getFieldLength(int tupleIndex, int fIdx);
 
-    public int getTupleEndOffset(int tupleIndex);
+    int getTupleLength(int tupleIndex);
 
-    public int getTupleStartOffset(int tupleIndex);
+    int getTupleEndOffset(int tupleIndex);
 
-    public int getTupleCount();
+    int getTupleStartOffset(int tupleIndex);
 
-    public ByteBuffer getBuffer();
+    int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx);
+
+    int getTupleCount();
+
+    ByteBuffer getBuffer();
+
+    void reset(ByteBuffer buffer);
 
-    public void reset(ByteBuffer buffer);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java
new file mode 100644
index 0000000..4da2afc
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameTupleAppender extends IFrameAppender {
+
+    boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException;
+
+    boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException;
+
+    boolean append(byte[] bytes, int offset, int length) throws HyracksDataException;
+
+    boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException;
+
+    boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) throws HyracksDataException;
+
+    boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
+            int tIndex1) throws HyracksDataException;
+
+    boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
+            int offset1, int dataLen1) throws HyracksDataException;
+
+    boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
index 8e35dda..538759e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -45,7 +45,7 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
  * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} is not called by the producer. So an exceptional
  * return from the {@link IFrameWriter#open()} call must clean up all partially
  * allocated resources.
- * 
+ *
  * @author vinayakb
  */
 public interface IFrameWriter {
@@ -56,9 +56,8 @@ public interface IFrameWriter {
 
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
-     * 
-     * @param buffer
-     *            - Buffer containing data.
+     *
+     * @param buffer - Buffer containing data.
      * @throws HyracksDataException
      */
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
@@ -66,14 +65,14 @@ public interface IFrameWriter {
     /**
      * Indicate that a failure was encountered and the current stream is to be
      * aborted.
-     * 
+     *
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
      * Close this {@link IFrameWriter} and give up all resources.
-     * 
+     *
      * @throws HyracksDataException
      */
     public void close() throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java
new file mode 100644
index 0000000..902ae75
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.context.IHyracksFrameMgrContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NoShrinkVSizeFrame extends VSizeFrame {
+    public NoShrinkVSizeFrame(IHyracksFrameMgrContext ctx) throws HyracksDataException {
+        super(ctx);
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        buffer.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java
new file mode 100644
index 0000000..a5a7f19
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksFrameMgrContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Variable size frame. The buffer inside can be resized.
+ */
+public class VSizeFrame implements IFrame {
+
+    protected final int minFrameSize;
+    protected IHyracksFrameMgrContext ctx;
+    protected ByteBuffer buffer;
+
+    public VSizeFrame(IHyracksFrameMgrContext ctx) throws HyracksDataException {
+        this(ctx, ctx.getInitialFrameSize());
+    }
+
+    public VSizeFrame(IHyracksFrameMgrContext ctx, int frameSize) throws HyracksDataException {
+        this.minFrameSize = ctx.getInitialFrameSize();
+        this.ctx = ctx;
+        buffer = ctx.allocateFrame(frameSize);
+    }
+
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public void ensureFrameSize(int newSize) throws HyracksDataException {
+        if (newSize > getFrameSize()) {
+            buffer = ctx.reallocateFrame(buffer, newSize, true);
+        }
+    }
+
+    @Override
+    public void resize(int frameSize) throws HyracksDataException {
+        if (getFrameSize() != frameSize) {
+            buffer = ctx.reallocateFrame(buffer, frameSize, false);
+        }
+    }
+
+    @Override
+    public int getFrameSize() {
+        return buffer.capacity();
+    }
+
+    @Override
+    public int getMinSize() {
+        return minFrameSize;
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        resize(minFrameSize);
+        buffer.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index eddc4df..d60ff6e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -14,17 +14,9 @@
  */
 package edu.uci.ics.hyracks.api.context;
 
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 
-public interface IHyracksCommonContext {
-    public int getFrameSize();
+public interface IHyracksCommonContext extends IHyracksFrameMgrContext{
 
     public IIOManager getIOManager();
-
-    public ByteBuffer allocateFrame() throws HyracksDataException;
-    
-    public void deallocateFrames(int frameCount);
 }


Mime
View raw message