asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [12/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 98b8e79..8a3c35d 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -1,164 +1,121 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
 
-public class FrameTupleAppender {
-    private final int frameSize;
+public class FrameTupleAppender extends AbstractFrameAppender implements IFrameTupleAppender {
 
-    private ByteBuffer buffer;
-
-    private int tupleCount;
-
-    private int tupleDataEndOffset;
-
-    private int numberOfFields = -1;
-    private int currentField = 0;
-    private int lastFieldEndOffset = 0;
-
-    public FrameTupleAppender(int frameSize) {
-        this.frameSize = frameSize;
+    public FrameTupleAppender() {
     }
 
-    public FrameTupleAppender(int frameSize, int numberOfFields) {
-        this.frameSize = frameSize;
-        this.numberOfFields = numberOfFields;
+    public FrameTupleAppender(IFrame frame) throws HyracksDataException {
+        reset(frame, true);
     }
 
-    public void reset(ByteBuffer buffer, boolean clear) {
-        this.buffer = buffer;
-        if (clear) {
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
-            tupleCount = 0;
-            tupleDataEndOffset = 0;
-        } else {
-            tupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
-            tupleDataEndOffset = tupleCount == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)
-                    - tupleCount * 4);
-        }
+    public FrameTupleAppender(IFrame frame, boolean clear) throws HyracksDataException {
+        reset(frame, clear);
     }
 
-    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) {
-        if (tupleDataEndOffset + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
+        if (canHoldNewTuple(fieldSlots.length, length)) {
             for (int i = 0; i < fieldSlots.length; ++i) {
-                buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldSlots[i]);
             }
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + fieldSlots.length * 4, length);
+            System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldSlots.length * 4, length);
             tupleDataEndOffset += fieldSlots.length * 4 + length;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(getBuffer().array(),
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
-    public boolean append(byte[] bytes, int offset, int length) {
-        if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
+    public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
+        if (canHoldNewTuple(0, length)) {
+            System.arraycopy(bytes, offset, getBuffer().array(), tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array,
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
-    public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) {
-        if (tupleDataEndOffset + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+    public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
+            throws HyracksDataException {
+        if (canHoldNewTuple(fieldSlots.length, length)) {
             int effectiveSlots = 0;
             for (int i = 0; i < fieldSlots.length; ++i) {
                 if (fieldSlots[i] > 0) {
-                    buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+                    IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldSlots[i]);
                     effectiveSlots++;
                 }
             }
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + effectiveSlots * 4, length);
+            System.arraycopy(bytes, offset, array, tupleDataEndOffset + effectiveSlots * 4, length);
             tupleDataEndOffset += effectiveSlots * 4 + length;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array,
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
-    public boolean appendField(byte[] bytes, int offset, int length) {
-        if (numberOfFields < 0) {
-            throw new IllegalStateException("unintialized number of fields " + numberOfFields);
-        }
-        int currentTupleDataStart = tupleDataEndOffset + numberOfFields * 4 + lastFieldEndOffset;
-        if (currentTupleDataStart + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
-            System.arraycopy(bytes, offset, buffer.array(), currentTupleDataStart, length);
-            lastFieldEndOffset = lastFieldEndOffset + length;
-            buffer.putInt(tupleDataEndOffset + currentField * 4, lastFieldEndOffset);
-            if (++currentField == numberOfFields) {
-                tupleDataEndOffset += numberOfFields * 4 + lastFieldEndOffset;
-                buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
-                ++tupleCount;
-                buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
-
-                //reset for the next tuple
-                currentField = 0;
-                lastFieldEndOffset = 0;
-            }
-            return true;
-        } else {
-            //reset for the next tuple
-            currentField = 0;
-            lastFieldEndOffset = 0;
-            return false;
-        }
-    }
-
-    public boolean appendField(IFrameTupleAccessor fta, int tIndex, int fIndex) {
-        if (numberOfFields < 0) {
-            throw new IllegalStateException("unintialized number of fields " + numberOfFields);
-        }
-        int startOffset = fta.getTupleStartOffset(tIndex);
-        int fStartOffset = fta.getFieldStartOffset(tIndex, fIndex);
-        int fLen = fta.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
-        return appendField(fta.getBuffer().array(), startOffset + fta.getFieldSlotsLength() + fStartOffset, fLen);
-    }
-
-    public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
+    public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
+            throws HyracksDataException {
         int length = tEndOffset - tStartOffset;
-        if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        if (canHoldNewTuple(0, length)) {
             ByteBuffer src = tupleAccessor.getBuffer();
-            System.arraycopy(src.array(), tStartOffset, buffer.array(), tupleDataEndOffset, length);
+            System.arraycopy(src.array(), tStartOffset, array, tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array,
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
-    public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) {
+    public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
         return append(tupleAccessor, tStartOffset, tEndOffset);
     }
 
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
+            int tIndex1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -167,7 +124,7 @@ public class FrameTupleAppender {
         int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
         int length1 = endOffset1 - startOffset1;
 
-        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        if (canHoldNewTuple(0, length0 + length1)) {
             ByteBuffer src0 = accessor0.getBuffer();
             ByteBuffer src1 = accessor1.getBuffer();
             int slotsLen0 = accessor0.getFieldSlotsLength();
@@ -175,28 +132,31 @@ public class FrameTupleAppender {
             int dataLen0 = length0 - slotsLen0;
             int dataLen1 = length1 - slotsLen1;
             // Copy slots from accessor0 verbatim
-            System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
+            System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0);
             // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < slotsLen1 / 4; ++i) {
-                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
+                        src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
                     + slotsLen1, dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
                     + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array,
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
     public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
-            int offset1, int dataLen1) {
+            int offset1, int dataLen1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -204,33 +164,36 @@ public class FrameTupleAppender {
         int slotsLen1 = fieldSlots1.length * 4;
         int length1 = slotsLen1 + dataLen1;
 
-        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        if (canHoldNewTuple(0, length0 + length1)) {
             ByteBuffer src0 = accessor0.getBuffer();
             int slotsLen0 = accessor0.getFieldSlotsLength();
             int dataLen0 = length0 - slotsLen0;
             // Copy slots from accessor0 verbatim
-            System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
+            System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0);
             // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < fieldSlots1.length; ++i) {
-                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
+                        (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
                     + slotsLen1, dataLen0);
             // Copy bytes1
-            System.arraycopy(bytes1, offset1, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4
-                    + dataLen0, dataLen1);
+            System.arraycopy(bytes1, offset1, array,
+                    tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array,
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
     public boolean appendConcat(int[] fieldSlots0, byte[] bytes0, int offset0, int dataLen0,
-            IFrameTupleAccessor accessor1, int tIndex1) {
+            IFrameTupleAccessor accessor1, int tIndex1) throws HyracksDataException {
         int slotsLen0 = fieldSlots0.length * 4;
         int length0 = slotsLen0 + dataLen0;
 
@@ -238,40 +201,45 @@ public class FrameTupleAppender {
         int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
         int length1 = endOffset1 - startOffset1;
 
-        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        if (canHoldNewTuple(0, length0 + length1)) {
             ByteBuffer src1 = accessor1.getBuffer();
             int slotsLen1 = accessor1.getFieldSlotsLength();
             int dataLen1 = length1 - slotsLen1;
             // Copy fieldSlots0 verbatim
             for (int i = 0; i < fieldSlots0.length; ++i) {
-                buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots0[i]);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldSlots0[i]);
             }
             // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < slotsLen1 / 4; ++i) {
-                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
+                        src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy bytes0
-            System.arraycopy(bytes0, offset0, buffer.array(), tupleDataEndOffset + slotsLen0 + slotsLen1, dataLen0);
+            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
                     + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array,
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
-    public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) {
+    public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields)
+            throws HyracksDataException {
         int fTargetSlotsLength = fields.length * 4;
         int length = fTargetSlotsLength;
         for (int i = 0; i < fields.length; ++i) {
             length += (accessor.getFieldEndOffset(tIndex, fields[i]) - accessor.getFieldStartOffset(tIndex, fields[i]));
         }
 
-        if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        if (canHoldNewTuple(0, length)) {
             int fSrcSlotsLength = accessor.getFieldSlotsLength();
             int tStartOffset = accessor.getTupleStartOffset(tIndex);
 
@@ -281,26 +249,21 @@ public class FrameTupleAppender {
                 int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
-                System.arraycopy(accessor.getBuffer().array(), fSrcStart, buffer.array(), tupleDataEndOffset
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset
                         + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
-                buffer.putInt(tupleDataEndOffset + i * 4, fEndOffset);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fEndOffset);
                 fStartOffset = fEndOffset;
             }
             tupleDataEndOffset += length;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array,
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            IntSerDeUtils
+                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
-    public int getTupleCount() {
-        return tupleCount;
-    }
-
-    public ByteBuffer getBuffer() {
-        return buffer;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderAccessor.java
new file mode 100644
index 0000000..b29c8d1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderAccessor.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.comm.io;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class FrameTupleAppenderAccessor extends FrameTupleAppender implements IFrameTupleAccessor {
+    private int tupleCountOffset;
+    private final RecordDescriptor recordDescriptor;
+
+    public FrameTupleAppenderAccessor(RecordDescriptor recordDescriptor) {
+        super();
+        this.recordDescriptor = recordDescriptor;
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        throw new IllegalAccessError("should not call this function");
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        int offset = tupleIndex == 0 ?
+                FrameConstants.TUPLE_START_OFFSET :
+                IntSerDeUtils.getInt(getBuffer().array(), tupleCountOffset - 4 * tupleIndex);
+        return offset;
+    }
+
+    @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return IntSerDeUtils.getInt(getBuffer().array(), tupleCountOffset - 4 * (tupleIndex + 1));
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return fIdx == 0 ? 0 : IntSerDeUtils.getInt(getBuffer().array(),
+                getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return IntSerDeUtils.getInt(getBuffer().array(), getTupleStartOffset(tupleIndex) + fIdx * 4);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleLength(int tupleIndex) {
+        return getTupleEndOffset(tupleIndex) - getTupleStartOffset(tupleIndex);
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return getFieldCount() * 4;
+    }
+
+    public void prettyPrint() {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        int tc = getTupleCount();
+        System.err.println("TC: " + tc);
+        for (int i = 0; i < tc; ++i) {
+            prettyPrint(i, bbis, dis);
+        }
+    }
+
+    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis) {
+        System.err.print("tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
+        for (int j = 0; j < getFieldCount(); ++j) {
+            System.err.print("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
+            System.err.print("{");
+            bbis.setByteBuffer(getBuffer(),
+                    getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
+            try {
+                System.err.print(recordDescriptor.getFields()[j].deserialize(dis));
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+            System.err.print("}");
+        }
+        System.err.println();
+    }
+
+    public void prettyPrint(int tid) {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        prettyPrint(tid, bbis, dis);
+    }
+
+    @Override
+    public int getFieldCount() {
+        return recordDescriptor.getFieldCount();
+    }
+
+    @Override
+    public void reset(IFrame frame, boolean clear) throws HyracksDataException {
+        super.reset(frame, clear);
+        this.tupleCountOffset = FrameHelper.getTupleCountOffset(frame.getFrameSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
index 2de4256..b8e7450 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
@@ -15,9 +15,9 @@
 
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -26,18 +26,15 @@ import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
  * This class wraps the calls of FrameTupleAppender and
  * allows user to not worry about flushing full frames.
  * TODO(yingyib): cleanup existing usage of FrameTupleAppender.
- * 
+ *
  * @author yingyib
  */
 public class FrameTupleAppenderWrapper {
-    private final FrameTupleAppender frameTupleAppender;
-    private final ByteBuffer outputFrame;
+    private final IFrameTupleAppender frameTupleAppender;
     private final IFrameWriter outputWriter;
 
-    public FrameTupleAppenderWrapper(FrameTupleAppender frameTupleAppender, ByteBuffer outputFrame,
-            IFrameWriter outputWriter) {
+    public FrameTupleAppenderWrapper(IFrameTupleAppender frameTupleAppender, IFrameWriter outputWriter) {
         this.frameTupleAppender = frameTupleAppender;
-        this.outputFrame = outputFrame;
         this.outputWriter = outputWriter;
     }
 
@@ -46,9 +43,7 @@ public class FrameTupleAppenderWrapper {
     }
 
     public void flush() throws HyracksDataException {
-        if (frameTupleAppender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-        }
+        frameTupleAppender.flush(outputWriter, true);
     }
 
     public void close() throws HyracksDataException {
@@ -59,81 +54,42 @@ public class FrameTupleAppenderWrapper {
         outputWriter.fail();
     }
 
-    public void reset(ByteBuffer buffer, boolean clear) {
+    public void reset(IFrame buffer, boolean clear) throws HyracksDataException {
         frameTupleAppender.reset(buffer, clear);
     }
 
     public void appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
             throws HyracksDataException {
-        if (!frameTupleAppender.append(fieldSlots, bytes, offset, length)) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-            frameTupleAppender.reset(outputFrame, true);
-            if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
-        }
+        FrameUtils.appendSkipEmptyFieldToWriter(outputWriter, frameTupleAppender,
+                fieldSlots, bytes, offset, length);
     }
 
     public void append(byte[] bytes, int offset, int length) throws HyracksDataException {
-        if (!frameTupleAppender.append(bytes, offset, length)) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-            frameTupleAppender.reset(outputFrame, true);
-            if (!frameTupleAppender.append(bytes, offset, length)) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
-        }
+        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, bytes, offset, length);
     }
 
-    public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) throws HyracksDataException {
-        if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-            frameTupleAppender.reset(outputFrame, true);
-            if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
-        }
+    public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
+            throws HyracksDataException {
+        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tStartOffset, tEndOffset);
     }
 
     public void append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
-        if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-            frameTupleAppender.reset(outputFrame, true);
-            if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
-        }
+        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tIndex);
     }
 
     public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
             throws HyracksDataException {
-        if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-            frameTupleAppender.reset(outputFrame, true);
-            if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
-        }
+        FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, accessor0, tIndex0, accessor1, tIndex1);
     }
 
     public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
             int dataLen1) throws HyracksDataException {
-        if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-            frameTupleAppender.reset(outputFrame, true);
-            if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
-        }
+        FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, accessor0, tIndex0,
+                fieldSlots1, bytes1, offset1, dataLen1);
     }
 
     public void appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException {
-        if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
-            FrameUtils.flushFrame(outputFrame, outputWriter);
-            frameTupleAppender.reset(outputFrame, true);
-            if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
-        }
+        FrameUtils.appendProjectionToWriter(outputWriter, frameTupleAppender, accessor, tIndex, fields);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
index 471b1ef..0995564 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
@@ -15,79 +15,23 @@
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
 import java.io.DataInputStream;
-import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
-public class ResultFrameTupleAccessor implements IFrameTupleAccessor {
+public class ResultFrameTupleAccessor extends FrameTupleAccessor {
 
-    private final int frameSize;
-    private ByteBuffer buffer;
-
-    public ResultFrameTupleAccessor(int frameSize) {
-        this.frameSize = frameSize;
-    }
-
-    @Override
-    public void reset(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    @Override
-    public ByteBuffer getBuffer() {
-        return buffer;
+    public ResultFrameTupleAccessor() {
+        super(null);
     }
 
     @Override
-    public int getTupleCount() {
-        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
-    }
-
-    @Override
-    public int getTupleStartOffset(int tupleIndex) {
-        return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
-    }
-
-    @Override
-    public int getTupleEndOffset(int tupleIndex) {
-        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
-    }
-
-    @Override
-    public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
-    }
-
-    @Override
-    public int getFieldEndOffset(int tupleIndex, int fIdx) {
-        return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
-    }
-
-    @Override
-    public int getFieldLength(int tupleIndex, int fIdx) {
-        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getFieldSlotsLength() {
-        return getFieldCount() * 4;
-    }
-
-    public void prettyPrint() {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        int tc = getTupleCount();
-        System.err.println("TC: " + tc);
-        for (int i = 0; i < tc; ++i) {
-            System.err.print(i + ":(" + getTupleStartOffset(i) + ", " + getTupleEndOffset(i) + ")[");
+    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
+        sb.append(tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
 
-            bbis.setByteBuffer(buffer, getTupleStartOffset(i));
-            System.err.print(dis);
+        bbis.setByteBuffer(getBuffer(), getTupleStartOffset(tid));
+        sb.append(dis);
 
-            System.err.println("]");
-        }
+        sb.append("]\n");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 4392fd6..3feeb58 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -14,21 +14,20 @@
  */
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
-import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
     private static final Logger LOGGER = Logger.getLogger(SerializingDataWriter.class.getName());
 
-    private final ByteBuffer buffer;
-
     private final ArrayTupleBuilder tb;
 
     private final RecordDescriptor recordDescriptor;
@@ -41,20 +40,17 @@ public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
 
     public SerializingDataWriter(IHyracksTaskContext ctx, RecordDescriptor recordDescriptor, IFrameWriter frameWriter)
             throws HyracksDataException {
-        buffer = ctx.allocateFrame();
         tb = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
         this.recordDescriptor = recordDescriptor;
         this.frameWriter = frameWriter;
-        tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+        tupleAppender = new FrameTupleAppender(new VSizeFrame(ctx));
         open = false;
     }
 
     @Override
     public void open() throws HyracksDataException {
         frameWriter.open();
-        buffer.clear();
         open = true;
-        tupleAppender.reset(buffer, true);
     }
 
     @Override
@@ -62,9 +58,7 @@ public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
         if (!open) {
             throw new HyracksDataException("Closing SerializingDataWriter that has not been opened");
         }
-        if (tupleAppender.getTupleCount() > 0) {
-            flushFrame();
-        }
+        tupleAppender.flush(frameWriter, true);
         frameWriter.close();
         open = false;
     }
@@ -82,22 +76,8 @@ public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
             }
             tb.addField(recordDescriptor.getFields()[i], instance);
         }
-        if (!tupleAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest("Flushing: position = " + buffer.position());
-            }
-            flushFrame();
-            tupleAppender.reset(buffer, true);
-            if (!tupleAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + buffer.capacity() + ")");
-            }
-        }
-    }
-
-    private void flushFrame() throws HyracksDataException {
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        frameWriter.nextFrame(buffer);
+        FrameUtils.appendToWriter(frameWriter, tupleAppender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                tb.getSize());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
index 4387e68..e2a9e5f 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
@@ -16,32 +16,268 @@ package edu.uci.ics.hyracks.dataflow.common.comm.util;
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrameFieldAppender;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class FrameUtils {
-    public static void copy(ByteBuffer srcFrame, ByteBuffer destFrame) {
-        makeReadable(srcFrame);
+
+    public static void copyWholeFrame(ByteBuffer srcFrame, ByteBuffer destFrame) {
+        srcFrame.clear();
         destFrame.clear();
         destFrame.put(srcFrame);
     }
 
-    public static void makeReadable(ByteBuffer frame) {
-        frame.position(0);
-        frame.limit(frame.capacity());
+    public static void copyAndFlip(ByteBuffer srcFrame, ByteBuffer destFrame) {
+        srcFrame.position(0);
+        destFrame.clear();
+        destFrame.put(srcFrame);
+        destFrame.flip();
     }
 
     public static void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
         writer.nextFrame(buffer);
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
+        buffer.clear();
+    }
+
+    /**
+     * A util function to append the data to appender. If the appender buffer is full, it will directly flush
+     * to the given writer, which saves the detecting logic in the caller.
+     * It will return the bytes that have been flushed.
+     *
+     * @param writer
+     * @param frameTupleAppender
+     * @param fieldSlots
+     * @param bytes
+     * @param offset
+     * @param length
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendSkipEmptyFieldToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+            int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.flush(writer, true);
+            if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+    /**
+     * A util function to append the data to appender. If the appender buffer is full, it will directly flush
+     * to the given writer, which saves the detecting logic in the caller.
+     * It will return the bytes that have been flushed.
+     *
+     * @param writer
+     * @param frameTupleAppender
+     * @param bytes
+     * @param offset
+     * @param length
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender, byte[] bytes,
+            int offset, int length) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.append(bytes, offset, length)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.flush(writer, true);
+            if (!frameTupleAppender.append(bytes, offset, length)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+    /**
+     * @param writer
+     * @param frameTupleAppender
+     * @param tupleAccessor
+     * @param tStartOffset
+     * @param tEndOffset
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+            IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
+            throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.flush(writer, true);
+            if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+    /**
+     * @param writer
+     * @param frameTupleAppender
+     * @param tupleAccessor
+     * @param tIndex
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+            IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.flush(writer, true);
+            if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+    /**
+     * @param writer
+     * @param tupleAppender
+     * @param fieldEndOffsets
+     * @param byteArray
+     * @param start
+     * @param size
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender tupleAppender,
+            int[] fieldEndOffsets, byte[] byteArray, int start, int size) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!tupleAppender.append(fieldEndOffsets, byteArray, start, size)) {
+
+            flushedBytes = tupleAppender.getBuffer().capacity();
+            tupleAppender.flush(writer, true);
+
+            if (!tupleAppender.append(fieldEndOffsets, byteArray, start, size)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+    /**
+     * @param writer
+     * @param frameTupleAppender
+     * @param accessor0
+     * @param tIndex0
+     * @param accessor1
+     * @param tIndex1
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendConcatToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+            IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.flush(writer, true);
+            if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+    /**
+     * @param writer
+     * @param frameTupleAppender
+     * @param accessor0
+     * @param tIndex0
+     * @param fieldSlots1
+     * @param bytes1
+     * @param offset1
+     * @param dataLen1
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendConcatToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+            IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+            int dataLen1) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.flush(writer, true);
+            if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+    /**
+     * @param writer
+     * @param frameTupleAppender
+     * @param accessor
+     * @param tIndex
+     * @param fields
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendProjectionToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+            IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.flush(writer, true);
+            if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
     }
 
-    public static int getAbsoluteFieldStartOffset(IFrameTupleAccessor accessor, int tuple, int field) {
-        return accessor.getTupleStartOffset(tuple) + accessor.getFieldSlotsLength()
-                + accessor.getFieldStartOffset(tuple, field);
+    /**
+     * @param writer
+     * @param appender
+     * @param array
+     * @param start
+     * @param length
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendFieldToWriter(IFrameWriter writer, IFrameFieldAppender appender, byte[] array,
+            int start, int length) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!appender.appendField(array, start, length)) {
+            flushedBytes = appender.getBuffer().capacity();
+            appender.flush(writer, true);
+            if (!appender.appendField(array, start, length)) {
+                throw new HyracksDataException("Could not write frame: the size of the tuple is too long");
+            }
+        }
+        return flushedBytes;
     }
+
+    /**
+     * @param writer
+     * @param appender
+     * @param accessor
+     * @param tid
+     * @param fid
+     * @return the number of bytes that have been flushed, 0 if not get flushed.
+     * @throws HyracksDataException
+     */
+    public static int appendFieldToWriter(IFrameWriter writer, IFrameFieldAppender appender,
+            IFrameTupleAccessor accessor, int tid, int fid) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!appender.appendField(accessor, tid, fid)) {
+            flushedBytes = appender.getBuffer().capacity();
+            appender.flush(writer, true);
+            if (!appender.appendField(accessor, tid, fid)) {
+                throw new HyracksDataException("Could not write frame: the size of the tuple is too long");
+            }
+        }
+        return flushedBytes;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
index 31cd25c..be40d9f 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
@@ -14,8 +14,8 @@
  */
 package edu.uci.ics.hyracks.dataflow.common.io;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -43,12 +43,30 @@ public class RunFileReader implements IFrameReader {
     }
 
     @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        buffer.clear();
+    public boolean nextFrame(IFrame frame) throws HyracksDataException {
         if (readPtr >= size) {
             return false;
         }
-        readPtr += ioManager.syncRead(handle, readPtr, buffer);
+        frame.reset();
+        int readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
+        if (readLength <= 0) {
+            throw new HyracksDataException("Premature end of file");
+        }
+        readPtr += readLength;
+        frame.ensureFrameSize(frame.getMinSize() * FrameHelper.deserializeNumOfMinFrame(frame.getBuffer()));
+        if (frame.getBuffer().hasRemaining()) {
+            if (readPtr < size) {
+                readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
+                if (readLength < 0) {
+                    throw new HyracksDataException("Premature end of file");
+                }
+                readPtr += readLength;
+            }
+            if (frame.getBuffer().hasRemaining()) { // file is vanished.
+                FrameHelper.clearRemainingFrame(frame.getBuffer(), frame.getBuffer().position());
+            }
+        }
+        frame.getBuffer().flip();
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
index 9faef09..4e2a985 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -22,4 +22,10 @@ public class IntSerDeUtils {
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
 
+    public static void putInt(byte[] bytes, int offset, int value) {
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
new file mode 100644
index 0000000..ab9333c
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.comm.io.largeobject;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameFixedFieldAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+public class FrameFixedFieldTupleAppenderTest {
+
+    static final int INPUT_BUFFER_SIZE = 4096;
+    static final int TEST_FRAME_SIZE = 256;
+
+    FrameFixedFieldAppender appender;
+    static ISerializerDeserializer[] fields = new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE,
+    };
+    static RecordDescriptor recordDescriptor = new RecordDescriptor(fields);
+    static ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
+
+    class SequetialDataVerifier implements IFrameWriter {
+
+        private final IFrameTupleAccessor accessor;
+        private IFrameTupleAccessor innerAccessor;
+        private int tid;
+
+        public SequetialDataVerifier(IFrameTupleAccessor accessor) {
+            this.accessor = accessor;
+            this.innerAccessor = new FrameTupleAccessor(recordDescriptor);
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            this.tid = 0;
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            innerAccessor.reset(buffer);
+            for (int i = 0; i < innerAccessor.getTupleCount(); ++i) {
+                validate(innerAccessor, i);
+            }
+        }
+
+        private void validate(IFrameTupleAccessor innerAccessor, int i) {
+            assertTrue(tid < accessor.getTupleCount());
+            assertEquals(accessor.getTupleLength(tid), innerAccessor.getTupleLength(i));
+            assertArrayEquals(Arrays.copyOfRange(accessor.getBuffer().array(), accessor.getTupleStartOffset(tid),
+                            accessor.getTupleEndOffset(tid)),
+                    Arrays.copyOfRange(innerAccessor.getBuffer().array(), innerAccessor.getTupleStartOffset(i),
+                            innerAccessor.getTupleEndOffset(i)));
+            tid++;
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            assert false;
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            assertEquals(accessor.getTupleCount(), tid);
+        }
+    }
+
+    @Before
+    public void createAppender() throws HyracksDataException {
+        appender = new FrameFixedFieldAppender(fields.length);
+        FrameManager manager = new FrameManager(TEST_FRAME_SIZE);
+        IFrame frame = new VSizeFrame(manager);
+        appender.reset(frame, true);
+    }
+
+    private void testProcess(IFrameTupleAccessor accessor) throws HyracksDataException {
+        IFrameWriter writer = prepareValidator(accessor);
+        writer.open();
+        for (int tid = 0; tid < accessor.getTupleCount(); tid++) {
+            for (int fid = 0; fid < fields.length; fid++) {
+                if (!appender.appendField(accessor, tid, fid)) {
+                    appender.flush(writer, true);
+                    if (!appender.appendField(accessor, tid, fid)) {
+                    }
+                }
+            }
+        }
+        appender.flush(writer, true);
+        writer.close();
+    }
+
+    @Test
+    public void testAppendFieldShouldSucceed() throws HyracksDataException {
+        IFrameTupleAccessor accessor = prepareData(DATA_TYPE.NORMAL_RECORD);
+        testProcess(accessor);
+    }
+
+    @Test
+    public void testResetShouldWork() throws HyracksDataException {
+        testAppendFieldShouldSucceed();
+        appender.reset(new VSizeFrame(new FrameManager(TEST_FRAME_SIZE)), true);
+        testAppendFieldShouldSucceed();
+    }
+
+    private IFrameWriter prepareValidator(IFrameTupleAccessor accessor) throws HyracksDataException {
+        return new SequetialDataVerifier(accessor);
+    }
+
+    enum DATA_TYPE {
+        NORMAL_RECORD,
+        ONE_FIELD_LONG,
+        ONE_RECORD_LONG,
+    }
+
+    private IFrameTupleAccessor prepareData(DATA_TYPE type) throws HyracksDataException {
+        IFrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptor);
+        IFrameTupleAppender appender = new FrameTupleAppender(
+                new VSizeFrame(new FrameManager(INPUT_BUFFER_SIZE)), true);
+        int i = 0;
+        do {
+            switch (type) {
+                case NORMAL_RECORD:
+                    makeATuple(tupleBuilder, i++);
+                    break;
+                case ONE_FIELD_LONG:
+                    makeASizeUpTuple(tupleBuilder, i++);
+                    break;
+                case ONE_RECORD_LONG:
+                    makeABigObjectTuple(tupleBuilder, i++);
+                    break;
+            }
+        } while (appender
+                .append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()));
+        accessor.reset(appender.getBuffer());
+        return accessor;
+    }
+
+    private void makeATuple(ArrayTupleBuilder tupleBuilder, int i) throws HyracksDataException {
+        tupleBuilder.reset();
+        tupleBuilder.addField(fields[0], i);
+        tupleBuilder.addField(fields[1], String.valueOf(i));
+        tupleBuilder.addField(fields[2], -i);
+        tupleBuilder.addField(fields[3], String.valueOf(-i));
+    }
+
+    private String makeALongString(int length, char ch) {
+        char[] array = new char[length];
+        Arrays.fill(array, ch);
+        return new String(array);
+    }
+
+    private void makeASizeUpTuple(ArrayTupleBuilder tupleBuilder, int i) throws HyracksDataException {
+        tupleBuilder.reset();
+        tupleBuilder.addField(fields[0], i);
+        tupleBuilder.addField(fields[1], makeALongString(Math.min(Math.abs(1 << i), INPUT_BUFFER_SIZE), (char) i));
+        tupleBuilder.addField(fields[2], -i);
+        tupleBuilder.addField(fields[3], String.valueOf(-i));
+    }
+
+    private void makeABigObjectTuple(ArrayTupleBuilder tupleBuilder, int i) throws HyracksDataException {
+        tupleBuilder.reset();
+        tupleBuilder.addField(fields[0], i);
+        tupleBuilder.addField(fields[1], makeALongString(Math.min(i * 20, TEST_FRAME_SIZE), (char) i));
+        tupleBuilder.addField(fields[2], -i);
+        tupleBuilder.addField(fields[3], makeALongString(Math.min(i * 20, TEST_FRAME_SIZE), (char) i));
+    }
+
+    @Test
+    public void testAppendLargeFieldShouldSucceed() throws HyracksDataException {
+        IFrameTupleAccessor accessor = prepareData(DATA_TYPE.ONE_FIELD_LONG);
+        testProcess(accessor);
+    }
+
+    @Test
+    public void testAppendSmallFieldButLargeObjectWithShouldSucceed() throws HyracksDataException {
+        IFrameTupleAccessor accessor = prepareData(DATA_TYPE.ONE_RECORD_LONG);
+        testProcess(accessor);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 9bce999..0c06769 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -29,7 +29,6 @@
 package edu.uci.ics.hyracks.dataflow.hadoop;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -45,6 +44,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -205,9 +205,7 @@ public class HadoopReadOperatorDescriptor extends AbstractSingleActivityOperator
 
                     key = hadoopRecordReader.createKey();
                     value = hadoopRecordReader.createValue();
-                    ByteBuffer outBuffer = ctx.allocateFrame();
-                    FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                    appender.reset(outBuffer, true);
+                    FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
                     RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                             (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
                             (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
@@ -223,18 +221,11 @@ public class HadoopReadOperatorDescriptor extends AbstractSingleActivityOperator
                                 case 1:
                                     tb.addField(outputRecordDescriptor.getFields()[1], value);
                             }
-                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                FrameUtils.flushFrame(outBuffer, writer);
-                                appender.reset(outBuffer, true);
-                                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                    throw new HyracksDataException("Record size (" + tb.getSize()
-                                            + ") larger than frame size (" + outBuffer.capacity() + ")");
-                                }
-                            }
-                        }
-                        if (appender.getTupleCount() > 0) {
-                            FrameUtils.flushFrame(outBuffer, writer);
+                            FrameUtils
+                                    .appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(),
+                                            0, tb.getSize());
                         }
+                        appender.flush(writer, true);
                     } catch (Exception e) {
                         writer.fail();
                         throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
index 5ac55ff..95f53df 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
@@ -203,7 +203,7 @@ public class HadoopHelper {
 
     public int getSortFrameLimit(IHyracksCommonContext ctx) {
         int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
-        return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
+        return (int) (((long) sortMemory * 1024 * 1024) / ctx.getInitialFrameSize());
     }
 
     public Job getJob() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
index 0bbb21f..070fc88 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
@@ -15,44 +15,42 @@
 package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.util.Progress;
 
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class KVIterator implements RawKeyValueIterator {
     private final HadoopHelper helper;
     private FrameTupleAccessor accessor;
     private DataInputBuffer kBuffer;
     private DataInputBuffer vBuffer;
-    private List<ByteBuffer> buffers;
+    private List<IFrame> buffers;
     private int bSize;
     private int bPtr;
     private int tIdx;
     private boolean eog;
 
-    public KVIterator(IHyracksTaskContext ctx, HadoopHelper helper, RecordDescriptor recordDescriptor) {
+    public KVIterator(HadoopHelper helper, RecordDescriptor recordDescriptor) {
         this.helper = helper;
-        accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        accessor = new FrameTupleAccessor(recordDescriptor);
         kBuffer = new DataInputBuffer();
         vBuffer = new DataInputBuffer();
     }
 
-    void reset(List<ByteBuffer> buffers, int bSize) {
+    void reset(List<IFrame> buffers, int bSize) {
         this.buffers = buffers;
         this.bSize = bSize;
         bPtr = 0;
         tIdx = 0;
         eog = false;
         if (bSize > 0) {
-            accessor.reset(buffers.get(0));
+            accessor.reset(buffers.get(0).getBuffer());
             tIdx = -1;
         } else {
             eog = true;
@@ -83,14 +81,14 @@ public class KVIterator implements RawKeyValueIterator {
                     continue;
                 }
                 tIdx = -1;
-                accessor.reset(buffers.get(bPtr));
+                accessor.reset(buffers.get(bPtr).getBuffer());
                 continue;
             }
             kBuffer.reset(accessor.getBuffer().array(),
-                    FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.KEY_FIELD_INDEX),
+                    accessor.getAbsoluteFieldStartOffset(tIdx, helper.KEY_FIELD_INDEX),
                     accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
             vBuffer.reset(accessor.getBuffer().array(),
-                    FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.VALUE_FIELD_INDEX),
+                    accessor.getAbsoluteFieldStartOffset(tIdx, helper.KEY_FIELD_INDEX),
                     accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
             break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index 2bef21a..bf6e2cf 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -29,7 +29,9 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -40,13 +42,12 @@ import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.MRContextUtil;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.MRContextUtil;
 
 public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
         extends AbstractSingleActivityOperatorDescriptor {
@@ -82,16 +83,15 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
 
         class SortingRecordWriter extends RecordWriter<K2, V2> {
             private final ArrayTupleBuilder tb;
-            private final ByteBuffer frame;
+            private final IFrame frame;
             private final FrameTupleAppender fta;
             private ExternalSortRunGenerator runGen;
             private int blockId;
 
             public SortingRecordWriter() throws HyracksDataException {
                 tb = new ArrayTupleBuilder(2);
-                frame = ctx.allocateFrame();
-                fta = new FrameTupleAppender(ctx.getFrameSize());
-                fta.reset(frame, true);
+                frame = new VSizeFrame(ctx);
+                fta = new FrameTupleAppender(frame);
             }
 
             public void initBlock(int blockId) throws HyracksDataException {
@@ -113,31 +113,29 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                 value.write(dos);
                 tb.addFieldEndOffset();
                 if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    runGen.nextFrame(frame);
+                    runGen.nextFrame(frame.getBuffer());
                     fta.reset(frame, true);
                     if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                         throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
-                                + frame.capacity() + ")");
+                                + frame.getBuffer().capacity() + ")");
                     }
                 }
             }
 
             public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
                 if (fta.getTupleCount() > 0) {
-                    runGen.nextFrame(frame);
+                    runGen.nextFrame(frame.getBuffer());
                     fta.reset(frame, true);
                 }
                 runGen.close();
                 IFrameWriter delegatingWriter = new IFrameWriter() {
-                    private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                    private final ByteBuffer outFrame = ctx.allocateFrame();
-                    private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(),
+                    private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
+                    private final FrameTupleAccessor fta = new FrameTupleAccessor(
                             helper.getMapOutputRecordDescriptorWithoutExtraFields());
                     private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
 
                     @Override
                     public void open() throws HyracksDataException {
-                        appender.reset(outFrame, true);
                     }
 
                     @Override
@@ -155,8 +153,7 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                             }
                             tb.addFieldEndOffset();
                             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                FrameUtils.flushFrame(outFrame, writer);
-                                appender.reset(outFrame, true);
+                                appender.flush(writer, true);
                                 if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                                     throw new IllegalStateException();
                                 }
@@ -166,9 +163,7 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
 
                     @Override
                     public void close() throws HyracksDataException {
-                        if (appender.getTupleCount() > 0) {
-                            FrameUtils.flushFrame(outFrame, writer);
-                        }
+                        appender.flush(writer, true);
                     }
 
                     @Override
@@ -183,12 +178,10 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                     TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
                     final IFrameWriter outputWriter = delegatingWriter;
                     RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
-                        private final FrameTupleAppender fta = new FrameTupleAppender(ctx.getFrameSize());
-                        private final ByteBuffer buffer = ctx.allocateFrame();
+                        private final FrameTupleAppender fta = new FrameTupleAppender(new VSizeFrame(ctx));
                         private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
 
                         {
-                            fta.reset(buffer, true);
                             outputWriter.open();
                         }
 
@@ -201,8 +194,7 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                             value.write(dos);
                             tb.addFieldEndOffset();
                             if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                FrameUtils.flushFrame(buffer, outputWriter);
-                                fta.reset(buffer, true);
+                                fta.flush(outputWriter, true);
                                 if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                                     throw new IllegalStateException();
                                 }
@@ -211,10 +203,7 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
 
                         @Override
                         public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-                            if (fta.getTupleCount() > 0) {
-                                FrameUtils.flushFrame(buffer, outputWriter);
-                                outputWriter.close();
-                            }
+                            fta.flush(outputWriter, true);
                         }
                     };
                     delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
@@ -226,7 +215,7 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                 for (int i = 0; i < comparatorFactories.length; ++i) {
                     comparators[i] = comparatorFactories[i].createBinaryComparator();
                 }
-                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
+                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(),
                         runGen.getRuns(), new int[] { 0 }, comparators, null,
                         helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
                 merger.process();
@@ -253,8 +242,9 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                                 Thread.currentThread().setContextClassLoader(ctxCL);
                             }
                             recordWriter.initBlock(blockId);
-                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId, recordReader,
-                                    recordWriter, null, null, split);
+                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil()
+                                    .createMapContext(conf, taId, recordReader,
+                                            recordWriter, null, null, split);
                             mapper.run(mCtx);
                             recordReader.close();
                             recordWriter.sortAndFlushBlock(writer);


Mime
View raw message