asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [11/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
index 8877df4..ccf7276 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
@@ -26,7 +26,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.counters.GenericCounter;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,7 +45,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
     private final int[] groupFields;
     private final FrameTupleAccessor accessor0;
     private final FrameTupleAccessor accessor1;
-    private final ByteBuffer copyFrame;
+    private final IFrame copyFrame;
     private final IBinaryComparator[] comparators;
     private final KVIterator kvi;
     private final Reducer<K2, V2, K3, V3> reducer;
@@ -53,7 +55,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
 
     private boolean first;
     private boolean groupStarted;
-    private List<ByteBuffer> group;
+    private List<IFrame> group;
     private int bPtr;
     private FrameTupleAppender fta;
     private Counter keyCounter;
@@ -66,10 +68,10 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
         this.ctx = ctx;
         this.helper = helper;
         this.groupFields = groupFields;
-        accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        copyFrame = ctx.allocateFrame();
-        accessor1.reset(copyFrame);
+        accessor0 = new FrameTupleAccessor(recordDescriptor);
+        accessor1 = new FrameTupleAccessor(recordDescriptor);
+        copyFrame = new VSizeFrame(ctx);
+        accessor1.reset(copyFrame.getBuffer());
         comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -79,17 +81,17 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
         this.taId = taId;
         this.taskAttemptContext = taskAttemptContext;
 
-        kvi = new KVIterator(ctx, helper, recordDescriptor);
+        kvi = new KVIterator(helper, recordDescriptor);
     }
 
     @Override
     public void open() throws HyracksDataException {
         first = true;
         groupStarted = false;
-        group = new ArrayList<ByteBuffer>();
+        group = new ArrayList<>();
         bPtr = 0;
-        group.add(ctx.allocateFrame());
-        fta = new FrameTupleAppender(ctx.getFrameSize());
+        group.add(new VSizeFrame(ctx));
+        fta = new FrameTupleAppender();
         keyCounter = new GenericCounter();
         valueCounter = new GenericCounter();
     }
@@ -104,6 +106,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
                 first = false;
             } else {
                 if (i == 0) {
+                    accessor1.reset(copyFrame.getBuffer());
                     switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
                 } else {
                     switchGroupIfRequired(accessor0, i - 1, accessor0, i);
@@ -111,20 +114,21 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
             }
             accumulate(accessor0, i);
         }
-        FrameUtils.copy(buffer, copyFrame);
+        copyFrame.ensureFrameSize(buffer.capacity());
+        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
     }
 
     private void accumulate(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
         if (!fta.append(accessor, tIndex)) {
             ++bPtr;
             if (group.size() <= bPtr) {
-                group.add(ctx.allocateFrame());
+                group.add(new VSizeFrame(ctx));
             }
             fta.reset(group.get(bPtr), true);
             if (!fta.append(accessor, tIndex)) {
                 throw new HyracksDataException("Record size ("
                         + (accessor.getTupleEndOffset(tIndex) - accessor.getTupleStartOffset(tIndex))
-                        + ") larger than frame size (" + group.get(bPtr).capacity() + ")");
+                        + ") larger than frame size (" + group.get(bPtr).getBuffer().capacity() + ")");
             }
         }
     }
@@ -137,7 +141,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
         }
     }
 
-    private void groupInit() {
+    private void groupInit() throws HyracksDataException {
         groupStarted = true;
         bPtr = 0;
         fta.reset(group.get(0), true);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index 295fe1f..6708e17 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -22,8 +22,11 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.NoShrinkVSizeFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -34,26 +37,29 @@ import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 
 public class ShuffleFrameReader implements IFrameReader {
     private final IHyracksTaskContext ctx;
     private final NonDeterministicChannelReader channelReader;
     private final HadoopHelper helper;
     private final RecordDescriptor recordDescriptor;
+    private final IFrame vframe;
     private List<RunFileWriter> runFileWriters;
+    private List<Integer> runFileMaxFrameSize;
     private RunFileReader reader;
 
     public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
             MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
         this.ctx = ctx;
         this.channelReader = channelReader;
-        helper = new HadoopHelper(mConfig);
+        this.helper = new HadoopHelper(mConfig);
         this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+        this.vframe = new NoShrinkVSizeFrame(ctx);
     }
 
     @Override
@@ -61,21 +67,28 @@ public class ShuffleFrameReader implements IFrameReader {
         channelReader.open();
         int nSenders = channelReader.getSenderPartitionCount();
         runFileWriters = new ArrayList<RunFileWriter>();
+        runFileMaxFrameSize = new ArrayList<>();
         RunInfo[] infos = new RunInfo[nSenders];
-        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        IInputChannel[] channels = channelReader.getChannels();
+        FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptor);
         while (true) {
             int entry = channelReader.findNextSender();
             if (entry < 0) {
                 break;
             }
             RunInfo info = infos[entry];
-            IInputChannel channel = channels[entry];
-            ByteBuffer netBuffer = channel.getNextBuffer();
-            accessor.reset(netBuffer);
+            ByteBuffer netBuffer = channelReader.getNextBuffer(entry);
+            netBuffer.clear();
+            int nBlocks = FrameHelper.deserializeNumOfMinFrame(netBuffer);
+
+            if (nBlocks > 1) {
+                netBuffer = getCompleteBuffer(nBlocks, netBuffer, entry);
+            }
+
+            accessor.reset(netBuffer, 0, netBuffer.limit());
             int nTuples = accessor.getTupleCount();
             for (int i = 0; i < nTuples; ++i) {
-                int tBlockId = IntegerPointable.getInteger(accessor.getBuffer().array(), FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+                int tBlockId = IntegerPointable.getInteger(accessor.getBuffer().array(),
+                        accessor.getAbsoluteFieldStartOffset(i, HadoopHelper.BLOCKID_FIELD_INDEX));
                 if (info == null) {
                     info = new RunInfo();
                     info.reset(tBlockId);
@@ -86,7 +99,10 @@ public class ShuffleFrameReader implements IFrameReader {
                 }
                 info.write(accessor, i);
             }
-            channel.recycleBuffer(netBuffer);
+
+            if (nBlocks == 1) {
+                channelReader.recycleBuffer(entry, netBuffer);
+            }
         }
         for (int i = 0; i < infos.length; ++i) {
             RunInfo info = infos[i];
@@ -94,7 +110,6 @@ public class ShuffleFrameReader implements IFrameReader {
                 info.close();
             }
         }
-        infos = null;
 
         FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
         int framesLimit = helper.getSortFrameLimit(ctx);
@@ -103,22 +118,40 @@ public class ShuffleFrameReader implements IFrameReader {
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        List<IFrameReader> runs = new LinkedList<IFrameReader>();
-        for (RunFileWriter rfw : runFileWriters) {
-            runs.add(rfw.createReader());
+        List<RunAndMaxFrameSizePair> runs = new LinkedList<>();
+        for (int i = 0; i < runFileWriters.size(); i++) {
+            runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createReader(), runFileMaxFrameSize.get(i)));
         }
         RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
-        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
-                recordDescriptor, framesLimit, rfw);
+        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 },
+                comparators, null, recordDescriptor, framesLimit, rfw);
         merger.process();
 
         reader = rfw.createReader();
         reader.open();
     }
 
+    private ByteBuffer getCompleteBuffer(int nBlocks, ByteBuffer netBuffer, int entry) throws HyracksDataException {
+        vframe.reset();
+        vframe.ensureFrameSize(vframe.getMinSize() * nBlocks);
+        FrameUtils.copyWholeFrame(netBuffer, vframe.getBuffer());
+        channelReader.recycleBuffer(entry, netBuffer);
+        for (int i = 1; i < nBlocks; ++i) {
+            netBuffer = channelReader.getNextBuffer(entry);
+            netBuffer.clear();
+            vframe.getBuffer().put(netBuffer);
+            channelReader.recycleBuffer(entry, netBuffer);
+        }
+        if (vframe.getBuffer().hasRemaining()) { // bigger frame
+            FrameHelper.clearRemainingFrame(vframe.getBuffer(), vframe.getBuffer().position());
+        }
+        vframe.getBuffer().flip();
+        return vframe.getBuffer();
+    }
+
     @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        return reader.nextFrame(buffer);
+    public boolean nextFrame(IFrame frame) throws HyracksDataException {
+        return reader.nextFrame(frame);
     }
 
     @Override
@@ -127,20 +160,22 @@ public class ShuffleFrameReader implements IFrameReader {
     }
 
     private class RunInfo {
-        private final ByteBuffer buffer;
+        private final IFrame buffer;
         private final FrameTupleAppender fta;
 
         private FileReference file;
         private RunFileWriter rfw;
         private int blockId;
+        private int maxFrameSize = ctx.getInitialFrameSize();
 
         public RunInfo() throws HyracksDataException {
-            buffer = ctx.allocateFrame();
-            fta = new FrameTupleAppender(ctx.getFrameSize());
+            buffer = new VSizeFrame(ctx);
+            fta = new FrameTupleAppender();
         }
 
         public void reset(int blockId) throws HyracksDataException {
             this.blockId = blockId;
+            this.maxFrameSize = ctx.getInitialFrameSize();
             fta.reset(buffer, true);
             try {
                 file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
@@ -165,15 +200,15 @@ public class ShuffleFrameReader implements IFrameReader {
             flush();
             rfw.close();
             runFileWriters.add(rfw);
+            runFileMaxFrameSize.add(maxFrameSize);
         }
 
         private void flush() throws HyracksDataException {
             if (fta.getTupleCount() <= 0) {
                 return;
             }
-            buffer.limit(buffer.capacity());
-            buffer.position(0);
-            rfw.nextFrame(buffer);
+            maxFrameSize = buffer.getFrameSize() > maxFrameSize ? buffer.getFrameSize() : maxFrameSize;
+            rfw.nextFrame((ByteBuffer) buffer.getBuffer().clear());
             fta.reset(buffer, true);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index c4b23fd..5eadbca 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -66,5 +66,11 @@
   		<groupId>commons-io</groupId>
   		<artifactId>commons-io</artifactId>
   	</dependency>
+      <dependency>
+          <groupId>edu.uci.ics.hyracks</groupId>
+          <artifactId>hyracks-control-nc</artifactId>
+          <version>0.2.16-SNAPSHOT</version>
+          <scope>test</scope>
+      </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 607a817..12ff8e8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -18,6 +18,8 @@ import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -42,27 +44,58 @@ public class InputChannelFrameReader implements IFrameReader, IInputChannelMonit
     public void open() throws HyracksDataException {
     }
 
-    @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        synchronized (this) {
-            while (!failed && !eos && availableFrames <= 0) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-            if (failed) {
-                throw new HyracksDataException("Failure occurred on input");
+    private synchronized boolean canGetNextBuffer() throws HyracksDataException {
+        while (!failed && !eos && availableFrames <= 0) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
             }
-            if (availableFrames <= 0 && eos) {
-                return false;
+        }
+        if (failed) {
+            throw new HyracksDataException("Failure occurred on input");
+        }
+        if (availableFrames <= 0 && eos) {
+            return false;
+        }
+        --availableFrames;
+        return true;
+    }
+
+    /**
+     * This implementation works under the truth that one Channel is never shared by two readers.
+     * More precisely, one channel only has exact one reader and one writer side.
+     *
+     * @param frame outputFrame
+     * @return {@code true} if succeed to read the data from the channel to the {@code frame}.
+     * Otherwise return {@code false} if the end of stream is reached.
+     * @throws HyracksDataException
+     */
+    @Override
+    public boolean nextFrame(IFrame frame) throws HyracksDataException {
+        if (!canGetNextBuffer()) {
+            return false;
+        }
+        frame.reset();
+        ByteBuffer srcFrame = channel.getNextBuffer();
+        int nBlocks = FrameHelper.deserializeNumOfMinFrame(srcFrame);
+        frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+        FrameUtils.copyWholeFrame(srcFrame, frame.getBuffer());
+        channel.recycleBuffer(srcFrame);
+
+        for (int i = 1; i < nBlocks; ++i) {
+            if (!canGetNextBuffer()) {
+                throw new HyracksDataException(
+                        "InputChannelReader is waiting for the new frames, but the input stream is finished");
             }
-            --availableFrames;
+            srcFrame = channel.getNextBuffer();
+            frame.getBuffer().put(srcFrame);
+            channel.recycleBuffer(srcFrame);
+        }
+        if (frame.getBuffer().hasRemaining()) { // bigger frame
+            FrameHelper.clearRemainingFrame(frame.getBuffer(), frame.getBuffer().position());
         }
-        ByteBuffer srcBuffer = channel.getNextBuffer();
-        FrameUtils.copy(srcBuffer, buffer);
-        channel.recycleBuffer(srcBuffer);
+        frame.getBuffer().flip();
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 7f447c6..0c25d54 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.collectors;
 
+import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -67,12 +68,27 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor, IPar
         return nSenderPartitions;
     }
 
-    public void open() throws HyracksDataException {
-        lastReadSender = -1;
+    public synchronized ByteBuffer getNextBuffer(int index) throws HyracksDataException {
+        while ((availableFrameCounts[index] <= 0)) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        if (--availableFrameCounts[index] == 0) {
+            frameAvailability.clear(index);
+        }
+        return channels[index].getNextBuffer();
+
     }
 
-    public IInputChannel[] getChannels() {
-        return channels;
+    public void recycleBuffer(int index, ByteBuffer frame) {
+        channels[index].recycleBuffer(frame);
+    }
+
+    public void open() throws HyracksDataException {
+        lastReadSender = -1;
     }
 
     public synchronized int findNextSender() throws HyracksDataException {
@@ -83,9 +99,6 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor, IPar
             }
             if (lastReadSender >= 0) {
                 assert availableFrameCounts[lastReadSender] > 0;
-                if (--availableFrameCounts[lastReadSender] == 0) {
-                    frameAvailability.clear(lastReadSender);
-                }
                 return lastReadSender;
             }
             if (!failSenders.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
index e107cfa..6dd6972 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
@@ -16,7 +16,8 @@ package edu.uci.ics.hyracks.dataflow.std.collectors;
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -34,16 +35,27 @@ public class NonDeterministicFrameReader implements IFrameReader {
     }
 
     @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    public boolean nextFrame(IFrame frame) throws HyracksDataException {
         int index = channelReader.findNextSender();
-        if (index >= 0) {
-            IInputChannel[] channels = channelReader.getChannels();
-            ByteBuffer srcFrame = channels[index].getNextBuffer();
-            FrameUtils.copy(srcFrame, buffer);
-            channels[index].recycleBuffer(srcFrame);
-            return true;
+        if (index < 0) {
+            return false;
         }
-        return false;
+        frame.reset();
+        ByteBuffer srcFrame = channelReader.getNextBuffer(index);
+        int nBlocks = FrameHelper.deserializeNumOfMinFrame(srcFrame);
+        frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+        FrameUtils.copyWholeFrame(srcFrame, frame.getBuffer());
+        channelReader.recycleBuffer(index, srcFrame);
+        for (int i = 1; i < nBlocks; ++i) {
+            srcFrame = channelReader.getNextBuffer(index);
+            frame.getBuffer().put(srcFrame);
+            channelReader.recycleBuffer(index, srcFrame);
+        }
+        if (frame.getBuffer().hasRemaining()) { // bigger frame
+            FrameHelper.clearRemainingFrame(frame.getBuffer(), frame.getBuffer().position());
+        }
+        frame.getBuffer().flip();
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
index 2dda9cc..125d07a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
@@ -14,11 +14,12 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.collectors;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
@@ -54,13 +55,13 @@ public class SortMergeFrameReader implements IFrameReader {
     @Override
     public void open() throws HyracksDataException {
         if (maxConcurrentMerges >= nSenders) {
-            List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
+            List<IFrame> inFrames = new ArrayList<>(nSenders);
             for (int i = 0; i < nSenders; ++i) {
-                inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
+                inFrames.add(new VSizeFrame(ctx));
             }
-            List<IFrameReader> batch = new ArrayList<IFrameReader>();
+            List<IFrameReader> batch = new ArrayList<IFrameReader>(nSenders);
             pbm.getNextBatch(batch, nSenders);
-            merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames, sortFields,
+            merger = new RunMergingFrameReader(ctx, batch, inFrames, sortFields,
                     comparators, nmkComputer, recordDescriptor);
         } else {
             // multi level merge.
@@ -70,10 +71,8 @@ public class SortMergeFrameReader implements IFrameReader {
     }
 
     @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        buffer.position(buffer.capacity());
-        buffer.limit(buffer.capacity());
-        return merger.nextFrame(buffer);
+    public boolean nextFrame(IFrame frame) throws HyracksDataException {
+        return merger.nextFrame(frame);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index dcca28e..edf4cc9 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -17,19 +17,22 @@ package edu.uci.ics.hyracks.dataflow.std.connectors;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class LocalityAwarePartitionDataWriter implements IFrameWriter {
 
     private final IFrameWriter[] pWriters;
-    private final FrameTupleAppender[] appenders;
+    private final IFrameTupleAppender[] appenders;
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
 
@@ -38,17 +41,17 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
             ILocalityMap localityMap, int senderIndex) throws HyracksDataException {
         int[] consumerPartitions = localityMap.getConsumers(senderIndex, nConsumerPartitions);
         pWriters = new IFrameWriter[consumerPartitions.length];
-        appenders = new FrameTupleAppender[consumerPartitions.length];
+        appenders = new IFrameTupleAppender[consumerPartitions.length];
         for (int i = 0; i < consumerPartitions.length; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(consumerPartitions[i]);
-                appenders[i] = new FrameTupleAppender(ctx.getFrameSize());
-                appenders[i].reset(ctx.allocateFrame(), true);
+                appenders[i] = new FrameTupleAppender();
+                appenders[i].reset(new VSizeFrame(ctx), true);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
         }
-        tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
         this.tpc = tpc;
     }
 
@@ -61,7 +64,6 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
     public void open() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
             pWriters[i].open();
-            appenders[i].reset(appenders[i].getBuffer(), true);
         }
     }
 
@@ -77,15 +79,7 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
         int tupleCount = tupleAccessor.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
             int h = pWriters.length == 1 ? 0 : tpc.partition(tupleAccessor, i, pWriters.length);
-            FrameTupleAppender appender = appenders[h];
-            if (!appender.append(tupleAccessor, i)) {
-                ByteBuffer appenderBuffer = appender.getBuffer();
-                flushFrame(appenderBuffer, pWriters[h]);
-                appender.reset(appenderBuffer, true);
-                if (!appender.append(tupleAccessor, i)) {
-                    throw new HyracksDataException("Record size (" + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i)) + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
-                }
-            }
+            FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
         }
     }
 
@@ -101,12 +95,6 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
         }
     }
 
-    private void flushFrame(ByteBuffer buffer, IFrameWriter frameWriter) throws HyracksDataException {
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        frameWriter.nextFrame(buffer);
-    }
-
     /*
      * (non-Javadoc)
      * 
@@ -115,9 +103,7 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
     @Override
     public void close() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
-            if (appenders[i].getTupleCount() > 0) {
-                flushFrame(appenders[i].getBuffer(), pWriters[i]);
-            }
+            appenders[i].flush(pWriters[i], true);
             pWriters[i].close();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index ea586fc..74f16d1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -19,12 +19,14 @@ import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class PartitionDataWriter implements IFrameWriter {
     private final int consumerPartitionCount;
@@ -33,7 +35,7 @@ public class PartitionDataWriter implements IFrameWriter {
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
     private final IHyracksTaskContext ctx;
-    private boolean allocated = false;
+    private boolean allocatedFrame = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
@@ -43,12 +45,12 @@ public class PartitionDataWriter implements IFrameWriter {
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
-                appenders[i] = new FrameTupleAppender(ctx.getFrameSize());
+                appenders[i] = new FrameTupleAppender();
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
         }
-        tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
         this.tpc = tpc;
         this.ctx = ctx;
     }
@@ -56,21 +58,13 @@ public class PartitionDataWriter implements IFrameWriter {
     @Override
     public void close() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
-            if (allocated) {
-                if (appenders[i].getTupleCount() > 0) {
-                    flushFrame(appenders[i].getBuffer(), pWriters[i]);
-                }
+            if (allocatedFrame) {
+                appenders[i].flush(pWriters[i], true);
             }
             pWriters[i].close();
         }
     }
 
-    private void flushFrame(ByteBuffer buffer, IFrameWriter frameWriter) throws HyracksDataException {
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        frameWriter.nextFrame(buffer);
-    }
-
     @Override
     public void open() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
@@ -80,34 +74,22 @@ public class PartitionDataWriter implements IFrameWriter {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        if (!allocated) {
+        if (!allocatedFrame) {
             allocateFrames();
-            allocated = true;
+            allocatedFrame = true;
         }
         tupleAccessor.reset(buffer);
         int tupleCount = tupleAccessor.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
             int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
-            FrameTupleAppender appender = appenders[h];
-            if (!appender.append(tupleAccessor, i)) {
-                ByteBuffer appenderBuffer = appender.getBuffer();
-                flushFrame(appenderBuffer, pWriters[h]);
-                appender.reset(appenderBuffer, true);
-                if (!appender.append(tupleAccessor, i)) {
-                    throw new HyracksDataException("Record size ("
-                            + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i))
-                            + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
-                }
-            }
+            FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
+
         }
     }
 
-    /**
-     * @throws HyracksDataException
-     */
     private void allocateFrames() throws HyracksDataException {
         for (int i = 0; i < appenders.length; ++i) {
-            appenders[i].reset(ctx.allocateFrame(), true);
+            appenders[i].reset(new VSizeFrame(ctx), true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 5be1eab..765e223 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -18,11 +18,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -41,7 +40,8 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
         this(fieldParserFactories, fieldDelimiter, '\"');
     }
 
-    public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter, char quote) {
+    public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter,
+            char quote) {
         this.valueParserFactories = fieldParserFactories;
         this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
@@ -57,8 +57,8 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
                     for (int i = 0; i < valueParserFactories.length; ++i) {
                         valueParsers[i] = valueParserFactories[i].createValueParser();
                     }
-                    ByteBuffer frame = ctx.allocateFrame();
-                    FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+                    IFrame frame = new VSizeFrame(ctx);
+                    FrameTupleAppender appender = new FrameTupleAppender();
                     appender.reset(frame, true);
                     ArrayTupleBuilder tb = new ArrayTupleBuilder(valueParsers.length);
                     DataOutput dos = tb.getDataOutput();
@@ -80,18 +80,10 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
                             valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
                             tb.addFieldEndOffset();
                         }
-                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            FrameUtils.flushFrame(frame, writer);
-                            appender.reset(frame, true);
-                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new HyracksDataException("Record size (" + tb.getSize()
-                                        + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
-                            }
-                        }
-                    }
-                    if (appender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(frame, writer);
+                        FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                                tb.getSize());
                     }
+                    appender.flush(writer, true);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 99f5a5f..7b2e8a0 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -37,7 +37,7 @@ import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodeP
 public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
@@ -45,11 +45,6 @@ public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOpe
 
     private String delim;
 
-    /**
-     * @param spec
-     * @param inputArity
-     * @param outputArity
-     */
     public PlainFileWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvider,
             String delim) {
         super(spec, 1, 0);
@@ -74,7 +69,7 @@ public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOpe
         // Output files
         final FileSplit[] splits = fileSplitProvider.getFileSplits();
         // Frame accessor
-        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
         // Record descriptor
         final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index a4970ea..4d62fa0 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -14,11 +14,12 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -33,7 +34,6 @@ import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
@@ -76,10 +76,8 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
         }
 
         RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
-        final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
-                internalRecordDescriptor);
-        final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
-                internalRecordDescriptor);
+        final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(internalRecordDescriptor);
+        final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(internalRecordDescriptor);
 
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
@@ -118,14 +116,14 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
             private int lastBufIndex;
 
-            private ByteBuffer outputFrame;
+            private IFrame outputFrame;
             private FrameTupleAppender outputAppender;
 
-            private FrameTupleAppender stateAppender = new FrameTupleAppender(ctx.getFrameSize());
+            private FrameTupleAppender stateAppender = new FrameTupleAppender();
 
             private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);
             private final TuplePointer storedTuplePointer = new TuplePointer();
-            private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+            private final List<IFrame> frames = new ArrayList<>();
 
             /**
              * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
@@ -153,7 +151,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                         table.getTuplePointer(entry, offset, storedTuplePointer);
                         int fIndex = storedTuplePointer.frameIndex;
                         int tIndex = storedTuplePointer.tupleIndex;
-                        storedKeysAccessor1.reset(frames.get(fIndex));
+                        storedKeysAccessor1.reset(frames.get(fIndex).getBuffer());
                         int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
                         int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
                         int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
@@ -191,7 +189,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                     table.getTuplePointer(entry, offset++, storedTuplePointer);
                     if (storedTuplePointer.frameIndex < 0)
                         break;
-                    storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
+                    storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex).getBuffer());
                     int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
                     if (c == 0) {
                         foundGroup = true;
@@ -232,7 +230,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
             }
 
             @Override
-            public List<ByteBuffer> getFrames() {
+            public List<IFrame> getFrames() {
                 return frames;
             }
 
@@ -244,11 +242,11 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
             @Override
             public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
                 if (outputFrame == null) {
-                    outputFrame = ctx.allocateFrame();
+                    outputFrame = new VSizeFrame(ctx);
                 }
 
                 if (outputAppender == null) {
-                    outputAppender = new FrameTupleAppender(outputFrame.capacity());
+                    outputAppender = new FrameTupleAppender();
                 }
 
                 outputAppender.reset(outputFrame, true);
@@ -265,7 +263,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                             int bIndex = storedTuplePointer.frameIndex;
                             int tIndex = storedTuplePointer.tupleIndex;
 
-                            storedKeysAccessor1.reset(frames.get(bIndex));
+                            storedKeysAccessor1.reset(frames.get(bIndex).getBuffer());
 
                             outputTupleBuilder.reset();
                             for (int k = 0; k < storedKeys.length; k++) {
@@ -285,8 +283,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
                             if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                     outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                                FrameUtils.flushFrame(outputFrame, writer);
-                                outputAppender.reset(outputFrame, true);
+                                outputAppender.flush(writer, true);
                                 if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                         outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                                     throw new HyracksDataException(
@@ -296,10 +293,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
                         } while (true);
                     }
-                    if (outputAppender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(outputFrame, writer);
-                        outputAppender.reset(outputFrame, true);
-                    }
+                    outputAppender.flush(writer, true);
                     aggregator.close();
                     return;
                 }
@@ -311,8 +305,8 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                     int frameIndex = storedTuplePointer.frameIndex;
                     int tupleIndex = storedTuplePointer.tupleIndex;
                     // Get the frame containing the value
-                    ByteBuffer buffer = frames.get(frameIndex);
-                    storedKeysAccessor1.reset(buffer);
+                    IFrame buffer = frames.get(frameIndex);
+                    storedKeysAccessor1.reset(buffer.getBuffer());
 
                     outputTupleBuilder.reset();
                     for (int k = 0; k < storedKeys.length; k++) {
@@ -332,18 +326,14 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
                     if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                             outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outputFrame, writer);
-                        outputAppender.reset(outputFrame, true);
+                        outputAppender.flush(writer, true);
                         if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                 outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                             throw new HyracksDataException("The output item is too large to be fit into a frame.");
                         }
                     }
                 }
-                if (outputAppender.getTupleCount() > 0) {
-                    FrameUtils.flushFrame(outputFrame, writer);
-                    outputAppender.reset(outputFrame, true);
-                }
+                outputAppender.flush(writer, true);
                 aggregator.close();
             }
 
@@ -372,19 +362,14 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
                 if (frames.size() < framesLimit) {
                     // Insert a new frame
-                    ByteBuffer frame = ctx.allocateFrame();
-                    frame.position(0);
-                    frame.limit(frame.capacity());
+                    IFrame frame = new VSizeFrame(ctx);
                     frames.add(frame);
                     stateAppender.reset(frame, true);
                     lastBufIndex = frames.size() - 1;
                 } else {
                     // Reuse an old frame
                     lastBufIndex++;
-                    ByteBuffer frame = frames.get(lastBufIndex);
-                    frame.position(0);
-                    frame.limit(frame.capacity());
-                    stateAppender.reset(frame, true);
+                    stateAppender.reset(frames.get(lastBufIndex), true);
                 }
                 return true;
             }
@@ -398,7 +383,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                 table.getTuplePointer(mTable, mRow, storedTuplePointer);
                 int mFrame = storedTuplePointer.frameIndex;
                 int mTuple = storedTuplePointer.tupleIndex;
-                storedKeysAccessor1.reset(frames.get(mFrame));
+                storedKeysAccessor1.reset(frames.get(mFrame).getBuffer());
 
                 int a = offset;
                 int b = a;
@@ -416,7 +401,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                             table.getTuplePointer(bTable, bRow, storedTuplePointer);
                             int bFrame = storedTuplePointer.frameIndex;
                             int bTuple = storedTuplePointer.tupleIndex;
-                            storedKeysAccessor2.reset(frames.get(bFrame));
+                            storedKeysAccessor2.reset(frames.get(bFrame).getBuffer());
                             cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
                         }
                         if (cmp > 0) {
@@ -438,7 +423,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                             table.getTuplePointer(cTable, cRow, storedTuplePointer);
                             int cFrame = storedTuplePointer.frameIndex;
                             int cTuple = storedTuplePointer.tupleIndex;
-                            storedKeysAccessor2.reset(frames.get(cFrame));
+                            storedKeysAccessor2.reset(frames.get(cFrame).getBuffer());
                             cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
                         }
                         if (cmp < 0) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
index 6ac2a6d..b0c7e4d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -14,9 +14,9 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -29,7 +29,7 @@ public interface ISpillableTable {
 
     public int getFrameCount();
 
-    public List<ByteBuffer> getFrames();
+    public List<IFrame> getFrames();
 
     public void sortFrames() throws HyracksDataException;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 8e9e8b8..e683e47 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -61,7 +61,7 @@ class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOpera
         this.spillableTableFactory = spillableTableFactory;
         this.inRecordDescriptor = inRecordDescriptor;
         this.outRecordDescriptor = outRecordDescriptor;
-        this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+        this.accessor = new FrameTupleAccessor(inRecordDescriptor);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index a55443c..9e3d4fc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -14,14 +14,15 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group.external;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -33,7 +34,7 @@ import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppenderAccessor;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -60,12 +61,12 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
     /**
      * Input frames, one for each run file.
      */
-    private List<ByteBuffer> inFrames;
+    private List<IFrame> inFrames;
     /**
      * Output frame.
      */
-    private ByteBuffer outFrame, writerFrame;
-    private final FrameTupleAppender outAppender;
+    private IFrame outFrame, writerFrame;
+    private final FrameTupleAppenderAccessor outAppender;
     private FrameTupleAppender writerAppender;
     private LinkedList<RunFileReader> runs;
     private ExternalGroupState aggState;
@@ -76,7 +77,6 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
     private int runFrameLimit = 1;
     private int[] currentFrameIndexInRun;
     private int[] currentRunFrames;
-    private final FrameTupleAccessor outFrameAccessor;
 
     ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
             IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nmkFactory, int[] keyFields,
@@ -108,8 +108,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
 
         tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
         this.ctx = ctx;
-        outAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
+        outAppender = new FrameTupleAppenderAccessor(outRecordDescriptor);
         this.isOutputSorted = isOutputSorted;
         this.framesLimit = framesLimit;
         this.outRecordDescriptor = outRecordDescriptor;
@@ -132,10 +131,9 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
             } else {
                 aggState = null;
                 runs = new LinkedList<RunFileReader>(runs);
-                inFrames = new ArrayList<ByteBuffer>();
-                outFrame = ctx.allocateFrame();
+                inFrames = new ArrayList<>();
+                outFrame = new VSizeFrame(ctx);
                 outAppender.reset(outFrame, true);
-                outFrameAccessor.reset(outFrame);
                 while (runs.size() > 0) {
                     try {
                         doPass(runs);
@@ -160,7 +158,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
         boolean finalPass = false;
 
         while (inFrames.size() + 2 < framesLimit) {
-            inFrames.add(ctx.allocateFrame());
+            inFrames.add(new VSizeFrame(ctx));
         }
         int runNumber;
         if (runs.size() + 2 <= framesLimit) {
@@ -184,8 +182,8 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
             RunFileReader[] runFileReaders = new RunFileReader[runNumber];
             FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
             Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-            ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
-                    runNumber, comparator, keyFields, nmkComputer);
+            ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(runNumber, comparator, keyFields,
+                    nmkComputer);
             /**
              * current tuple index in each run
              */
@@ -203,8 +201,8 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
                 for (int j = 0; j < runFrameLimit; j++) {
                     int frameIndex = currentFrameIndexInRun[runIndex] + j;
                     if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
-                        tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
-                        tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                        tupleAccessors[frameIndex] = new FrameTupleAccessor(outRecordDescriptor);
+                        tupleAccessors[frameIndex].reset(inFrames.get(frameIndex).getBuffer());
                         currentRunFrames[runIndex]++;
                         if (j == 0)
                             setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
@@ -224,11 +222,11 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
                 ReferenceEntry top = topTuples.peek();
                 int tupleIndex = top.getTupleIndex();
                 int runIndex = topTuples.peek().getRunid();
-                FrameTupleAccessor fta = top.getAccessor();
+                IFrameTupleAccessor fta = top.getAccessor();
 
-                int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+                int currentTupleInOutFrame = outAppender.getTupleCount() - 1;
                 if (currentTupleInOutFrame < 0
-                        || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+                        || compareFrameTuples(fta, tupleIndex, outAppender, currentTupleInOutFrame) != 0) {
                     /**
                      * Initialize the first output record Reset the
                      * tuple builder
@@ -259,7 +257,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
                      * outFrame
                      */
 
-                    aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, aggregateState);
+                    aggregator.aggregate(fta, tupleIndex, outAppender, currentTupleInOutFrame, aggregateState);
 
                 }
                 tupleIndices[runIndex]++;
@@ -295,49 +293,42 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
         }
 
         if (writerFrame == null) {
-            writerFrame = ctx.allocateFrame();
+            writerFrame = new VSizeFrame(ctx);
         }
 
         if (writerAppender == null) {
-            writerAppender = new FrameTupleAppender(ctx.getFrameSize());
+            writerAppender = new FrameTupleAppender();
             writerAppender.reset(writerFrame, true);
         }
 
-        outFrameAccessor.reset(outFrame);
-
-        for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+        for (int i = 0; i < outAppender.getTupleCount(); i++) {
 
             finalTupleBuilder.reset();
 
             for (int k = 0; k < storedKeys.length; k++) {
-                finalTupleBuilder.addField(outFrameAccessor, i, storedKeys[k]);
+                finalTupleBuilder.addField(outAppender, i, storedKeys[k]);
             }
 
             if (isFinal) {
 
-                aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+                aggregator.outputFinalResult(finalTupleBuilder, outAppender, i, aggregateState);
 
             } else {
 
-                aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+                aggregator.outputPartialResult(finalTupleBuilder, outAppender, i, aggregateState);
             }
 
             if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
                     finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                FrameUtils.flushFrame(writerFrame, writer);
-                writerAppender.reset(writerFrame, true);
+                writerAppender.flush(writer, true);
                 if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
                         finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
                     throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
                 }
             }
         }
-        if (writerAppender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(writerFrame, writer);
-            writerAppender.reset(writerFrame, true);
-        }
+        writerAppender.flush(writer, true);
 
-        outAppender.reset(outFrame, true);
     }
 
     private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
@@ -377,7 +368,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
             for (int j = 0; j < runFrameLimit; j++) {
                 int frameIndex = currentFrameIndexInRun[runIndex] + j;
                 if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
-                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex).getBuffer());
                     existNext = true;
                     currentRunFrames[runIndex]++;
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index 3c0eb2b..0102e65 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -19,7 +19,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -32,6 +34,7 @@ import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -40,7 +43,7 @@ class GroupingHashTable {
     /**
      * The pointers in the link store 3 int values for each entry in the
      * hashtable: (bufferIdx, tIndex, accumulatorIdx).
-     * 
+     *
      * @author vinayakb
      */
     private static class Link {
@@ -67,7 +70,7 @@ class GroupingHashTable {
     private static final int INIT_AGG_STATE_SIZE = 8;
     private final IHyracksTaskContext ctx;
 
-    private final List<ByteBuffer> buffers;
+    private final List<IFrame> buffers;
     private final Link[] table;
     /**
      * Aggregate states: a list of states for all groups maintained in the main
@@ -84,6 +87,7 @@ class GroupingHashTable {
     private final ITuplePartitionComputer tpc;
     private final IAggregatorDescriptor aggregator;
 
+    private final IFrame outputFrame;
     private final FrameTupleAppender appender;
 
     private final FrameTupleAccessor storedKeysAccessor;
@@ -96,7 +100,7 @@ class GroupingHashTable {
             throws HyracksDataException {
         this.ctx = ctx;
 
-        buffers = new ArrayList<ByteBuffer>();
+        buffers = new ArrayList<>();
         table = new Link[tableSize];
 
         keys = fields;
@@ -127,10 +131,10 @@ class GroupingHashTable {
         accumulatorSize = 0;
 
         RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
-        storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+        storedKeysAccessor = new FrameTupleAccessor(storedKeysRecordDescriptor);
         lastBIndex = -1;
 
-        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender = new FrameTupleAppender();
 
         addNewBuffer();
 
@@ -140,14 +144,13 @@ class GroupingHashTable {
             stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
         }
         outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        outputFrame = new VSizeFrame(ctx);
     }
 
     private void addNewBuffer() throws HyracksDataException {
-        ByteBuffer buffer = ctx.allocateFrame();
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        buffers.add(buffer);
-        appender.reset(buffer, true);
+        VSizeFrame frame = new VSizeFrame(ctx);
+        buffers.add(frame);
+        appender.reset(frame, true);
         ++lastBIndex;
     }
 
@@ -161,7 +164,7 @@ class GroupingHashTable {
         for (int i = 0; i < link.size; i += 3) {
             int sbIndex = link.pointers[i];
             int stIndex = link.pointers[i + 1];
-            storedKeysAccessor.reset(buffers.get(sbIndex));
+            storedKeysAccessor.reset(buffers.get(sbIndex).getBuffer());
             int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
             if (c == 0) {
                 saIndex = link.pointers[i + 2];
@@ -206,8 +209,7 @@ class GroupingHashTable {
     }
 
     void write(IFrameWriter writer) throws HyracksDataException {
-        ByteBuffer buffer = ctx.allocateFrame();
-        appender.reset(buffer, true);
+        appender.reset(outputFrame, true);
 
         for (int i = 0; i < table.length; ++i) {
             Link link = table[i];
@@ -216,7 +218,7 @@ class GroupingHashTable {
                     int bIndex = link.pointers[j];
                     int tIndex = link.pointers[j + 1];
                     int aIndex = link.pointers[j + 2];
-                    ByteBuffer keyBuffer = buffers.get(bIndex);
+                    ByteBuffer keyBuffer = buffers.get(bIndex).getBuffer();
                     storedKeysAccessor.reset(keyBuffer);
 
                     // copy keys
@@ -228,22 +230,13 @@ class GroupingHashTable {
                     aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor, tIndex,
                             aggregateStates[aIndex]);
 
-                    if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                        writer.nextFrame(buffer);
-                        appender.reset(buffer, true);
-                        if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
-                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                            throw new HyracksDataException("Cannot write the aggregation output into a frame.");
-                        }
-                    }
+                    FrameUtils.appendSkipEmptyFieldToWriter(writer, appender, outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize());
 
                 }
             }
         }
-        if (appender.getTupleCount() != 0) {
-            writer.nextFrame(buffer);
-        }
+        appender.flush(writer, true);
     }
 
     void close() throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
index 8e49a9a..998d882 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
@@ -44,7 +44,7 @@ class HashGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorN
             IAggregatorDescriptorFactory aggregatorFactory, int tableSize, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) {
         this.ctx = ctx;
-        this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+        this.accessor = new FrameTupleAccessor(inRecordDescriptor);
         this.stateId = stateId;
         this.keys = keys;
         this.tpcf = tpcf;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 9ce70c1..4dbf03b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -21,8 +21,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
@@ -52,12 +50,6 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        final ByteBuffer copyFrame = ctx.allocateFrame();
-        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
-        copyFrameAccessor.reset(copyFrame);
-        ByteBuffer outFrame = ctx.allocateFrame();
-        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-        appender.reset(outFrame, true);
         pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
                 outRecordDescriptor, writer);
         pgw.open();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 45f0488..559dec4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -16,15 +16,17 @@ package edu.uci.ics.hyracks.dataflow.std.group.preclustered;
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -35,7 +37,7 @@ public class PreclusteredGroupWriter implements IFrameWriter {
     private final IBinaryComparator[] comparators;
     private final IAggregatorDescriptor aggregator;
     private final AggregateState aggregateState;
-    private final ByteBuffer copyFrame;
+    private final IFrame copyFrame;
     private final FrameTupleAccessor inFrameAccessor;
     private final FrameTupleAccessor copyFrameAccessor;
 
@@ -62,15 +64,15 @@ public class PreclusteredGroupWriter implements IFrameWriter {
         this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields,
                 groupFields, writer);
         this.aggregateState = aggregator.createAggregateStates();
-        copyFrame = ctx.allocateFrame();
-        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor.reset(copyFrame);
+        copyFrame = new VSizeFrame(ctx);
+        inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
+        copyFrameAccessor = new FrameTupleAccessor(inRecordDesc);
+        copyFrameAccessor.reset(copyFrame.getBuffer());
 
-        ByteBuffer outFrame = ctx.allocateFrame();
-        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        VSizeFrame outFrame = new VSizeFrame(ctx);
+        FrameTupleAppender appender = new FrameTupleAppender();
         appender.reset(outFrame, true);
-        appenderWrapper = new FrameTupleAppenderWrapper(appender, outFrame, writer);
+        appenderWrapper = new FrameTupleAppenderWrapper(appender, writer);
 
         tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
     }
@@ -105,7 +107,9 @@ public class PreclusteredGroupWriter implements IFrameWriter {
 
             }
         }
-        FrameUtils.copy(buffer, copyFrame);
+        copyFrame.ensureFrameSize(buffer.capacity());
+        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+        copyFrameAccessor.reset(copyFrame.getBuffer());
     }
 
     private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
@@ -145,9 +149,9 @@ public class PreclusteredGroupWriter implements IFrameWriter {
             throws HyracksDataException {
         for (int i = 0; i < comparators.length; ++i) {
             int fIdx = groupFields[i];
-            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int s1 = a1.getAbsoluteFieldStartOffset(t1Idx, fIdx);
             int l1 = a1.getFieldLength(t1Idx, fIdx);
-            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int s2 = a2.getAbsoluteFieldStartOffset(t2Idx, fIdx);
             int l2 = a2.getFieldLength(t2Idx, fIdx);
             if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
                 return false;
@@ -165,6 +169,7 @@ public class PreclusteredGroupWriter implements IFrameWriter {
     @Override
     public void close() throws HyracksDataException {
         if (!isFailed && !first) {
+            assert(copyFrameAccessor.getTupleCount() > 0);
             writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
             appenderWrapper.flush();
         }


Mime
View raw message