asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [13/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksFrameMgrContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksFrameMgrContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksFrameMgrContext.java
new file mode 100644
index 0000000..d7a18bc
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksFrameMgrContext.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.context;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IHyracksFrameMgrContext {
+    int getInitialFrameSize();
+
+    //TODO tobedeleted
+    ByteBuffer allocateFrame() throws HyracksDataException;
+
+    ByteBuffer allocateFrame(int bytes) throws HyracksDataException;
+
+    ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newSizeInBytes, boolean copyOldData)
+            throws HyracksDataException;
+
+    /**
+     * The caller should call this method to return the pre-allocated frames.
+     *
+     * @param bytes
+     */
+    void deallocateFrames(int bytes);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
index d2c2ee3..dec0bfd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
@@ -14,13 +14,12 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IHyracksDatasetReader {
     public Status getResultStatus();
 
-    public int read(ByteBuffer buffer) throws HyracksDataException;
+    public int read(IFrame frame) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index 97ce664..9975fe3 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -14,36 +14,21 @@
  */
 package edu.uci.ics.hyracks.client.dataset;
 
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
 
-public class DatasetClientContext implements IHyracksCommonContext {
+public class DatasetClientContext extends FrameManager implements IHyracksCommonContext {
     private final int frameSize;
 
     public DatasetClientContext(int frameSize) {
+        super(frameSize);
         this.frameSize = frameSize;
     }
 
     @Override
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-    @Override
     public IIOManager getIOManager() {
         return null;
     }
 
-    @Override
-    public ByteBuffer allocateFrame() {
-        return ByteBuffer.allocate(frameSize);
-    }
-
-    @Override
-    public void deallocateFrames(int frameCount) {
-        // TODO Auto-generated method stub
-        
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 062b5bf..34c4d6e 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
@@ -64,7 +66,8 @@ public class HyracksDatasetReader implements IHyracksDatasetReader {
     private static int NUM_READ_BUFFERS = 1;
 
     public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection,
-            ClientNetworkManager netManager, DatasetClientContext datasetClientCtx, JobId jobId, ResultSetId resultSetId)
+            ClientNetworkManager netManager, DatasetClientContext datasetClientCtx, JobId jobId,
+            ResultSetId resultSetId)
             throws Exception {
         this.datasetDirectoryServiceConnection = datasetDirectoryServiceConnection;
         this.netManager = netManager;
@@ -119,7 +122,8 @@ public class HyracksDatasetReader implements IHyracksDatasetReader {
     }
 
     @Override
-    public int read(ByteBuffer buffer) throws HyracksDataException {
+    public int read(IFrame frame) throws HyracksDataException {
+        frame.reset();
         ByteBuffer readBuffer;
         int readSize = 0;
 
@@ -129,7 +133,7 @@ public class HyracksDatasetReader implements IHyracksDatasetReader {
             }
         }
 
-        while (readSize <= 0
+        while (readSize < frame.getFrameSize()
                 && !((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor))) {
             waitForNextFrame(lastMonitor);
             if (isPartitionReadComplete(lastMonitor)) {
@@ -142,14 +146,23 @@ public class HyracksDatasetReader implements IHyracksDatasetReader {
                 readBuffer = resultChannel.getNextBuffer();
                 lastMonitor.notifyFrameRead();
                 if (readBuffer != null) {
-                    buffer.put(readBuffer);
-                    buffer.flip();
-                    readSize = buffer.limit();
-                    resultChannel.recycleBuffer(readBuffer);
+                    if (readSize <=0) {
+                        int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
+                        frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+                        frame.getBuffer().clear();
+                        frame.getBuffer().put(readBuffer);
+                        resultChannel.recycleBuffer(readBuffer);
+                        readSize = frame.getBuffer().position();
+                    } else {
+                        frame.getBuffer().put(readBuffer);
+                        resultChannel.recycleBuffer(readBuffer);
+                        readSize = frame.getBuffer().position();
+                    }
                 }
             }
         }
 
+        frame.getBuffer().flip();
         return readSize;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
index c5cb7d0..87fa23c 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
@@ -95,7 +95,7 @@ public class NetworkInputChannel implements IInputChannel {
         }
         ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
-        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers, ctx.getFrameSize());
+        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers, ctx.getInitialFrameSize());
         ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
         writeBuffer.putLong(partitionId.getJobId().getId());
         writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
index b573b73..5ac0a47 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
@@ -54,32 +54,36 @@ public class NetworkOutputChannel implements IFrameWriter {
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         ByteBuffer destBuffer = null;
-        synchronized (this) {
-            while (true) {
-                if (aborted) {
-                    throw new HyracksDataException("Connection has been aborted");
-                }
-                destBuffer = emptyStack.poll();
-                if (destBuffer == null && allocateCounter < nBuffers) {
-                    destBuffer = ByteBuffer.allocateDirect(frameSize);
-                    allocateCounter++;
-                }
-                if (destBuffer != null) {
-                    break;
-                }
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+        int startPos = 0;
+        do {
+            synchronized (this) {
+                while (true) {
+                    if (aborted) {
+                        throw new HyracksDataException("Connection has been aborted");
+                    }
+                    destBuffer = emptyStack.poll();
+                    if (destBuffer == null && allocateCounter < nBuffers) {
+                        destBuffer = ByteBuffer.allocateDirect(frameSize);
+                        allocateCounter++;
+                    }
+                    if (destBuffer != null) {
+                        break;
+                    }
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
                 }
             }
-        }
-        buffer.position(0);
-        buffer.limit(destBuffer.capacity());
-        destBuffer.clear();
-        destBuffer.put(buffer);
-        destBuffer.flip();
-        ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+            buffer.position(startPos);
+            startPos = Math.min(startPos + destBuffer.capacity(), buffer.capacity());
+            buffer.limit(startPos);
+            destBuffer.clear();
+            destBuffer.put(buffer);
+            destBuffer.flip();
+            ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+        } while (startPos < buffer.capacity());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
index c59398c..641b7bf 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
@@ -30,7 +30,7 @@ public class ReadBufferFactory implements IBufferFactory {
 
     public ReadBufferFactory(int limit, IHyracksCommonContext ctx) {
         this.limit = limit;
-        this.frameSize = ctx.getFrameSize();
+        this.frameSize = ctx.getInitialFrameSize();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 89c5b75..c6e49e1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -56,6 +56,7 @@ import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
 
 public class Joblet implements IHyracksJobletContext, ICounterContext {
     private static final Logger LOGGER = Logger.getLogger(Joblet.class.getName());
@@ -88,7 +89,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
 
     private final IJobletEventListener jobletEventListener;
 
-    private final int frameSize;
+    private final FrameManager frameManager;
 
     private final AtomicLong memoryAllocation;
 
@@ -102,7 +103,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
         this.appCtx = appCtx;
         this.deploymentId = deploymentId;
         this.jobId = jobId;
-        this.frameSize = acg.getFrameSize();
+        this.frameManager = new FrameManager(acg.getFrameSize());
         memoryAllocation = new AtomicLong();
         this.acg = acg;
         partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
@@ -222,23 +223,33 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
     }
 
     ByteBuffer allocateFrame() throws HyracksDataException {
-        if (appCtx.getMemoryManager().allocate(frameSize)) {
-            memoryAllocation.addAndGet(frameSize);
-            return ByteBuffer.allocate(frameSize);
-        }
+        return frameManager.allocateFrame();
+    }
+
+    ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
+        if (appCtx.getMemoryManager().allocate(bytes)) {
+            memoryAllocation.addAndGet(bytes);
+            return frameManager.allocateFrame(bytes);
+       }
         throw new HyracksDataException("Unable to allocate frame: Not enough memory");
     }
 
-    public void deallocateFrames(int nFrames) {
-        memoryAllocation.addAndGet(nFrames * frameSize);
-        appCtx.getMemoryManager().deallocate(nFrames * frameSize);
+    ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
+            throws HyracksDataException {
+        return frameManager.reallocateFrame(usedBuffer, newFrameSizeInBytes, copyOldData);
+    }
+
+    void deallocateFrames(int bytes) {
+        memoryAllocation.addAndGet(bytes);
+        appCtx.getMemoryManager().deallocate(bytes);
+        frameManager.deallocateFrames(bytes);
     }
 
-    final int getFrameSize() {
-        return frameSize;
+    public final int getFrameSize() {
+        return frameManager.getInitialFrameSize();
     }
 
-    IIOManager getIOManager() {
+    public IIOManager getIOManager() {
         return appCtx.getRootContext().getIOManager();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 3014024..387ab04 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
@@ -120,12 +121,23 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
     }
 
     @Override
-    public void deallocateFrames(int frameCount) {
-        joblet.deallocateFrames(frameCount);
+    public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
+        return joblet.allocateFrame(bytes);
     }
 
     @Override
-    public int getFrameSize() {
+    public ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newSizeInBytes, boolean copyOldData)
+            throws HyracksDataException {
+        return joblet.reallocateFrame(usedBuffer, newSizeInBytes, copyOldData);
+    }
+
+    @Override
+    public void deallocateFrames(int bytes) {
+        joblet.deallocateFrames(bytes);
+    }
+
+    @Override
+    public int getInitialFrameSize() {
         return joblet.getFrameSize();
     }
 
@@ -317,12 +329,12 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
                 try {
                     writer.open();
                     try {
-                        ByteBuffer buffer = allocateFrame();
-                        while (reader.nextFrame(buffer)) {
+                        VSizeFrame frame = new VSizeFrame(this);
+                        while( reader.nextFrame(frame)){
                             if (aborted) {
                                 return;
                             }
-                            buffer.flip();
+                            ByteBuffer buffer = frame.getBuffer();
                             writer.nextFrame(buffer);
                             buffer.compact();
                         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index d3a6fb5..673a319 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -67,7 +67,7 @@ public class DatasetPartitionWriter implements IFrameWriter {
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
         resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIOManager(), fileFactory,
-                ctx.getFrameSize());
+                ctx.getInitialFrameSize());
     }
 
     public ResultState getResultState() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index c24034d..9ea6ddb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -190,6 +190,9 @@ public class ResultState implements IStateObject {
                     initReadFileHandle();
                 }
                 readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+                if (readSize < 0){
+                    throw new HyracksDataException("Premature end of file");
+                }
             }
 
             if (readSize < buffer.capacity()) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
index 239b6aa..b3f3e2a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -103,6 +103,15 @@ public class IOManager implements IIOManager {
         }
     }
 
+    /**
+     * Please do check the return value of this read!
+     *
+     * @param fHandle
+     * @param offset
+     * @param data
+     * @return The number of bytes read, possibly zero, or -1 if the given offset is greater than or equal to the file's current size
+     * @throws HyracksDataException
+     */
     @Override
     public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
         try {
@@ -111,7 +120,7 @@ public class IOManager implements IIOManager {
             while (remaining > 0) {
                 int len = ((FileHandle) fHandle).getFileChannel().read(data, offset);
                 if (len < 0) {
-                    return -1;
+                    return n == 0 ? -1 : n;
                 }
                 remaining -= len;
                 offset += len;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 816e345..b52a8a1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -114,6 +114,9 @@ public class MaterializedPartitionInputChannel implements IInputChannel {
                 ByteBuffer destFrame = emptyQueue.poll();
                 buffer.position(0);
                 buffer.limit(buffer.capacity());
+                if (destFrame.capacity() < buffer.capacity()){
+                    throw new HyracksDataException("should never happen");
+                }
                 destFrame.clear();
                 destFrame.put(buffer);
                 fullQueue.add(destFrame);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index b209cc1..4f5d60f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -64,7 +64,7 @@ public class PartitionManager {
              */
             NetworkOutputChannel writer = partitionRequests.remove(pid);
             if (writer != null) {
-                writer.setFrameSize(partition.getTaskContext().getFrameSize());
+                writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
                 partition.writeTo(writer);
                 if (!partition.isReusable()) {
                     return;
@@ -116,7 +116,7 @@ public class PartitionManager {
             List<IPartition> pList = availablePartitionMap.get(partitionId);
             if (pList != null && !pList.isEmpty()) {
                 IPartition partition = pList.get(0);
-                writer.setFrameSize(partition.getTaskContext().getFrameSize());
+                writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
                 partition.writeTo(writer);
                 if (!partition.isReusable()) {
                     availablePartitionMap.remove(partitionId);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
index e7102e1..c9fc237 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
@@ -14,8 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.profiling;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -44,8 +43,8 @@ public class ConnectorReceiverProfilingFrameReader implements IFrameReader {
     }
 
     @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        boolean status = reader.nextFrame(buffer);
+    public boolean nextFrame(IFrame frame) throws HyracksDataException {
+        boolean status = reader.nextFrame(frame);
         if (status) {
             frameCounter.update(1);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/FrameManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/FrameManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/FrameManager.java
new file mode 100644
index 0000000..fd71d8b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/FrameManager.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.resources.memory;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksFrameMgrContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameManager implements IHyracksFrameMgrContext {
+
+    private final int minFrameSize;
+
+    public FrameManager(int minFrameSize) {
+        this.minFrameSize = minFrameSize;
+    }
+
+    @Override
+    public int getInitialFrameSize() {
+        return minFrameSize;
+    }
+
+    @Override
+    public ByteBuffer allocateFrame() throws HyracksDataException {
+        return allocateFrame(minFrameSize);
+    }
+
+    @Override
+    public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
+        if (bytes % minFrameSize != 0) {
+            throw new HyracksDataException("The size should be an integral multiple of the default frame size");
+        }
+        ByteBuffer buffer = ByteBuffer.allocate(bytes);
+        if (bytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) {
+            throw new HyracksDataException(
+                    "Unable to allocate frame larger than:" + FrameConstants.MAX_NUM_MINFRAME + " bytes");
+        }
+        FrameHelper.serializeFrameSize(buffer, (byte) (bytes / minFrameSize));
+        return (ByteBuffer) buffer.clear();
+    }
+
+    @Override
+    public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newSizeInBytes, boolean copyOldData)
+            throws HyracksDataException {
+        if (!copyOldData) {
+            deallocateFrames(tobeDeallocate.capacity());
+            return allocateFrame(newSizeInBytes);
+        } else {
+            ByteBuffer buffer = allocateFrame(newSizeInBytes);
+            int limit = Math.min(newSizeInBytes, tobeDeallocate.capacity());
+            int pos = Math.min(limit, tobeDeallocate.position());
+            tobeDeallocate.position(0);
+            tobeDeallocate.limit(limit);
+            buffer.put(tobeDeallocate);
+            buffer.position(pos);
+
+            if (newSizeInBytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) {
+                throw new HyracksDataException("Unable to allocate frame of size bigger than MinFrameSize * "
+                        + FrameConstants.MAX_NUM_MINFRAME);
+            }
+            FrameHelper.serializeFrameSize(buffer, (byte) (newSizeInBytes / minFrameSize));
+            return buffer;
+        }
+    }
+
+    @Override
+    public void deallocateFrames(int bytes) {
+        //TODO make a global memory manager to allocate and deallocate the frames.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/pom.xml b/hyracks/hyracks-dataflow-common/pom.xml
index 9f965a3..3f2936d 100644
--- a/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks/hyracks-dataflow-common/pom.xml
@@ -57,6 +57,12 @@
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-data-std</artifactId>
   		<version>0.2.16-SNAPSHOT</version>
+    </dependency>
+   	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-control-nc</artifactId>
+  		<version>0.2.16-SNAPSHOT</version>
+        <scope>test</scope>
   	</dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
new file mode 100644
index 0000000..1f501aa
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class AbstractFrameAppender implements IFrameAppender {
+    protected IFrame frame;
+    protected byte[] array; // cached the getBuffer().array to speed up byte array access a little
+
+    protected int tupleCount;
+    protected int tupleDataEndOffset;
+
+    @Override
+    public void reset(IFrame frame, boolean clear) throws HyracksDataException {
+        this.frame = frame;
+        if (clear) {
+            this.frame.reset();
+        }
+        reset(getBuffer(), clear);
+    }
+
+    protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
+        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
+                + tupleCount * FrameConstants.SIZE_LEN
+                <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
+    }
+
+    private void reset(ByteBuffer buffer, boolean clear) {
+        array = buffer.array();
+        if (clear) {
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), 0);
+            tupleCount = 0;
+            tupleDataEndOffset = FrameConstants.TUPLE_START_OFFSET;
+        } else {
+            tupleCount = IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()));
+            tupleDataEndOffset = tupleCount == 0 ?
+                    FrameConstants.TUPLE_START_OFFSET :
+                    IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize())
+                            - tupleCount * FrameConstants.SIZE_LEN);
+        }
+    }
+
+    @Override
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return frame.getBuffer();
+    }
+
+    @Override
+    public void flush(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        getBuffer().clear();
+        if (getTupleCount() > 0) {
+            outWriter.nextFrame(getBuffer());
+        }
+        if (clearFrame) {
+            frame.reset();
+            reset(getBuffer(), true);
+        }
+    }
+
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength, frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
index 9223740..53c5eb3 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
@@ -17,6 +17,7 @@ package edu.uci.ics.hyracks.dataflow.common.comm.io;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameConstants.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameConstants.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameConstants.java
deleted file mode 100644
index 43538d7..0000000
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.common.comm.io;
-
-public interface FrameConstants {
-    public static final int SIZE_LEN = 4;
-
-    public static final boolean DEBUG_FRAME_IO = false;
-
-    public static final int FRAME_FIELD_MAGIC = 0x12345678;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializer.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializer.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializer.java
index 556ecbb..b1052e7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializer.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializer.java
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -41,11 +42,11 @@ public class FrameDeserializer {
 
     private ByteBuffer buffer;
 
-    public FrameDeserializer(int frameSize, RecordDescriptor recordDescriptor) {
+    public FrameDeserializer(RecordDescriptor recordDescriptor) {
         this.bbis = new ByteBufferInputStream();
         this.di = new DataInputStream(bbis);
         this.recordDescriptor = recordDescriptor;
-        frameTupleAccessor = new FrameTupleAccessor(frameSize, recordDescriptor);
+        frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
     }
 
     public void reset(ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataReader.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataReader.java
index b94a219..2dcf84c 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataReader.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataReader.java
@@ -14,16 +14,16 @@
  */
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class FrameDeserializingDataReader implements IOpenableDataReader<Object[]> {
-    private final ByteBuffer buffer;
+    private final IFrame frame;
 
     private boolean eos;
 
@@ -35,16 +35,15 @@ public class FrameDeserializingDataReader implements IOpenableDataReader<Object[
 
     public FrameDeserializingDataReader(IHyracksTaskContext ctx, IFrameReader frameReader,
             RecordDescriptor recordDescriptor) throws HyracksDataException {
-        buffer = ctx.allocateFrame();
+        this.frame = new VSizeFrame(ctx);
         this.frameReader = frameReader;
-        this.frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), recordDescriptor);
+        this.frameDeserializer = new FrameDeserializer(recordDescriptor);
     }
 
     @Override
     public void open() throws HyracksDataException {
         frameReader.open();
-        buffer.clear();
-        buffer.flip();
+        frame.reset();
         eos = false;
         first = true;
     }
@@ -64,11 +63,11 @@ public class FrameDeserializingDataReader implements IOpenableDataReader<Object[
             if (!first && !frameDeserializer.done()) {
                 return frameDeserializer.deserializeRecord();
             }
-            buffer.clear();
-            if (!frameReader.nextFrame(buffer)) {
+            frame.reset();
+            if (!frameReader.nextFrame(frame)) {
                 eos = true;
             } else {
-                frameDeserializer.reset(buffer);
+                frameDeserializer.reset(frame.getBuffer());
             }
             first = false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index b3a72d7..8855de5 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -29,7 +29,7 @@ public class FrameDeserializingDataWriter implements IFrameWriter {
     public FrameDeserializingDataWriter(IHyracksTaskContext ctx, IOpenableDataWriter<Object[]> writer,
             RecordDescriptor recordDescriptor) {
         this.writer = writer;
-        this.frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), recordDescriptor);
+        this.frameDeserializer = new FrameDeserializer(recordDescriptor);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
new file mode 100644
index 0000000..2db3e44
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.comm.io;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameFieldAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class FrameFixedFieldAppender extends AbstractFrameAppender implements IFrameFieldAppender {
+    private final int fieldCount;
+    private int lastFieldEndOffset;
+    private int currentField;
+    private int leftOverSize;
+    private byte[] cachedLeftOverFields;
+
+    public FrameFixedFieldAppender(int numberFields) {
+        this.fieldCount = numberFields;
+        this.lastFieldEndOffset = 0;
+        this.currentField = 0;
+        this.leftOverSize = 0;
+    }
+
+    @Override
+    public void reset(IFrame frame, boolean clear) throws HyracksDataException {
+        super.reset(frame, clear);
+        lastFieldEndOffset = 0;
+        currentField = 0;
+        leftOverSize = 0;
+    }
+
+    @Override
+    public void flush(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        super.flush(outWriter, clearFrame);
+        if (clearFrame) {
+            if (leftOverSize > 0) {
+                if (!canHoldNewTuple(0, leftOverSize)) {
+                    throw new HyracksDataException(
+                            "The given frame can not be extended to insert the leftover data from the last record");
+                }
+                System.arraycopy(cachedLeftOverFields, 0, array, tupleDataEndOffset, leftOverSize);
+                leftOverSize = 0;
+            }
+        }
+    }
+
+    public boolean appendField(byte[] bytes, int offset, int length) throws HyracksDataException {
+        if (canHoldNewTuple(fieldCount, lastFieldEndOffset + length)) {
+            int currentFieldDataStart = tupleDataEndOffset + fieldCount * 4 + lastFieldEndOffset;
+            System.arraycopy(bytes, offset, array, currentFieldDataStart, length);
+            lastFieldEndOffset = lastFieldEndOffset + length;
+            IntSerDeUtils.putInt(array, tupleDataEndOffset + currentField * 4, lastFieldEndOffset);
+            if (++currentField == fieldCount) {
+                tupleDataEndOffset += fieldCount * 4 + lastFieldEndOffset;
+                IntSerDeUtils
+                        .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                                tupleDataEndOffset);
+                ++tupleCount;
+                IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+
+                //reset for the next tuple
+                currentField = 0;
+                lastFieldEndOffset = 0;
+            }
+            return true;
+        } else {
+            if (currentField > 0) {
+                copyLeftOverData();
+            }
+            return false;
+        }
+    }
+
+    private void copyLeftOverData() {
+        leftOverSize = lastFieldEndOffset + fieldCount * 4;
+        if (cachedLeftOverFields == null || cachedLeftOverFields.length < leftOverSize) {
+            cachedLeftOverFields = new byte[leftOverSize];
+        }
+        System.arraycopy(array, tupleDataEndOffset, cachedLeftOverFields, 0, leftOverSize);
+    }
+
+    public boolean appendField(IFrameTupleAccessor fta, int tIndex, int fIndex) throws HyracksDataException {
+        int startOffset = fta.getTupleStartOffset(tIndex);
+        int fStartOffset = fta.getFieldStartOffset(tIndex, fIndex);
+        int fLen = fta.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+        return appendField(fta.getBuffer().array(), startOffset + fta.getFieldSlotsLength() + fStartOffset, fLen);
+    }
+
+    public boolean hasLeftOverFields() {
+        return currentField != 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java
new file mode 100644
index 0000000..74289c6
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java
@@ -0,0 +1,130 @@
+package edu.uci.ics.hyracks.dataflow.common.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameFieldAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This appender can appendTuple and appendField but at the expense of additional checks.
+ * Please use this Field/Tuple mixed appender only if you don't know the sequence of append functions.
+ * Try using {@link FrameFixedFieldAppender} if you only want to appendFields.
+ * and using {@link FrameTupleAppender} if you only want to appendTuples.
+ */
+public class FrameFixedFieldTupleAppender implements IFrameTupleAppender, IFrameFieldAppender {
+
+    private FrameFixedFieldAppender fieldAppender;
+    private FrameTupleAppender tupleAppender;
+    private IFrame sharedFrame;
+    private IFrameAppender lastAppender;
+
+    public FrameFixedFieldTupleAppender(int numFields) {
+        tupleAppender = new FrameTupleAppender();
+        fieldAppender = new FrameFixedFieldAppender(numFields);
+        lastAppender = tupleAppender;
+    }
+
+    private void resetAppenderIfNecessary(IFrameAppender appender) throws HyracksDataException {
+        if (lastAppender != appender) {
+            if (lastAppender == fieldAppender) {
+                if (fieldAppender.hasLeftOverFields()) {
+                    throw new HyracksDataException("The previous appended fields haven't been flushed yet.");
+                }
+            }
+            appender.reset(sharedFrame, false);
+            lastAppender = appender;
+        }
+    }
+
+    @Override
+    public boolean appendField(byte[] bytes, int offset, int length) throws HyracksDataException {
+        resetAppenderIfNecessary(fieldAppender);
+        return fieldAppender.appendField(bytes, offset, length);
+    }
+
+    @Override
+    public boolean appendField(IFrameTupleAccessor accessor, int tid, int fid) throws HyracksDataException {
+        resetAppenderIfNecessary(fieldAppender);
+        return fieldAppender.appendField(accessor, tid, fid);
+    }
+
+    @Override
+    public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.append(tupleAccessor, tIndex);
+    }
+
+    @Override
+    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.append(fieldSlots, bytes, offset, length);
+    }
+
+    @Override
+    public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.append(bytes, offset, length);
+    }
+
+    @Override
+    public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
+            throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length);
+    }
+
+    @Override
+    public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
+            throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.append(tupleAccessor, tStartOffset, tEndOffset);
+    }
+
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1);
+    }
+
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
+            int offset1, int dataLen1) throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1);
+    }
+
+    @Override
+    public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields)
+            throws HyracksDataException {
+        resetAppenderIfNecessary(tupleAppender);
+        return tupleAppender.appendProjection(accessor, tIndex, fields);
+    }
+
+    @Override
+    public void reset(IFrame frame, boolean clear) throws HyracksDataException {
+        sharedFrame = frame;
+        tupleAppender.reset(sharedFrame, clear);
+        fieldAppender.reset(sharedFrame, clear);
+    }
+
+    @Override
+    public int getTupleCount() {
+        return lastAppender.getTupleCount();
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return lastAppender.getBuffer();
+    }
+
+    @Override
+    public void flush(IFrameWriter outWriter, boolean clear) throws HyracksDataException {
+        lastAppender.flush(outWriter, clear);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
index 1810674..90f3d25 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -14,28 +14,26 @@
  */
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
-import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 
 public class FrameOutputStream extends ByteArrayAccessibleOutputStream {
     private static final Logger LOGGER = Logger.getLogger(FrameOutputStream.class.getName());
 
     private final FrameTupleAppender frameTupleAppender;
 
-    public FrameOutputStream(int frameSize) {
-        super(frameSize);
-        this.frameTupleAppender = new FrameTupleAppender(frameSize);
+    public FrameOutputStream(int initialStreamCapaciy) {
+        super(initialStreamCapaciy);
+        this.frameTupleAppender = new FrameTupleAppender();
     }
 
-    public void reset(ByteBuffer buffer, boolean clear) {
-        if (clear) {
-            buffer.clear();
-        }
-        frameTupleAppender.reset(buffer, clear);
+    public void reset(IFrame frame, boolean clear) throws HyracksDataException {
+        frameTupleAppender.reset(frame, clear);
     }
 
     public int getTupleCount() {
@@ -46,7 +44,7 @@ public class FrameOutputStream extends ByteArrayAccessibleOutputStream {
         return tupleCount;
     }
 
-    public boolean appendTuple() {
+    public boolean appendTuple() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.FINEST)) {
             LOGGER.finest("appendTuple(): tuple size: " + count);
         }
@@ -54,4 +52,8 @@ public class FrameOutputStream extends ByteArrayAccessibleOutputStream {
         count = 0;
         return appended;
     }
+
+    public void flush(IFrameWriter writer) throws HyracksDataException {
+        frameTupleAppender.flush(writer, true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index 6edd647..600e0b4 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -17,6 +17,7 @@ package edu.uci.ics.hyracks.dataflow.common.comm.io;
 import java.io.DataInputStream;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
 import edu.uci.ics.hyracks.api.comm.FrameHelper;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -26,29 +27,33 @@ import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
 
 /**
  * FrameTupleCursor is used to navigate over tuples in a Frame. A frame is
- * formatted with tuple data concatenated starting at offset 0, one tuple after
- * another. Offset FS - 4 holds an int indicating the number of tuples (N) in
+ * formatted with tuple data concatenated starting at offset 1, one tuple after
+ * another. The first byte is used to notify how big the frame is, so the maximum frame size is 255 * initialFrameSetting.
+ * Offset FS - 4 holds an int indicating the number of tuples (N) in
  * the frame. FS - ((i + 1) * 4) for i from 0 to N - 1 holds an int indicating
  * the offset of the (i + 1)^th tuple. Every tuple is organized as a sequence of
  * ints indicating the end of each field in the tuple relative to the end of the
  * field slots.
- *
- * @author vinayakb
  */
-public final class FrameTupleAccessor implements IFrameTupleAccessor {
-    private final int frameSize;
+public class FrameTupleAccessor implements IFrameTupleAccessor {
+    private int tupleCountOffset;
     private final RecordDescriptor recordDescriptor;
-
     private ByteBuffer buffer;
+    private int start;
 
-    public FrameTupleAccessor(int frameSize, RecordDescriptor recordDescriptor) {
-        this.frameSize = frameSize;
+    public FrameTupleAccessor(RecordDescriptor recordDescriptor) {
         this.recordDescriptor = recordDescriptor;
     }
 
     @Override
     public void reset(ByteBuffer buffer) {
+        reset(buffer, 0, buffer.limit());
+    }
+
+    public void reset(ByteBuffer buffer, int start, int length) {
         this.buffer = buffer;
+        this.start = start;
+        this.tupleCountOffset = start + FrameHelper.getTupleCountOffset(length);
     }
 
     @Override
@@ -58,28 +63,39 @@ public final class FrameTupleAccessor implements IFrameTupleAccessor {
 
     @Override
     public int getTupleCount() {
-        return IntSerDeUtils.getInt(buffer.array(), FrameHelper.getTupleCountOffset(frameSize));
+        return IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
     }
 
     @Override
     public int getTupleStartOffset(int tupleIndex) {
-        return tupleIndex == 0 ? 0 : IntSerDeUtils.getInt(buffer.array(), FrameHelper.getTupleCountOffset(frameSize)
-                - 4 * tupleIndex);
+        int offset = tupleIndex == 0 ?
+                FrameConstants.TUPLE_START_OFFSET :
+                IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
+        return start + offset;
+    }
+
+    @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + getFieldStartOffset(tupleIndex, fIdx);
     }
 
     @Override
     public int getTupleEndOffset(int tupleIndex) {
-        return IntSerDeUtils.getInt(buffer.array(), FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+        return start + IntSerDeUtils
+                .getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
     }
 
     @Override
     public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return fIdx == 0 ? 0 : IntSerDeUtils.getInt(buffer.array(), getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+        return fIdx == 0 ?
+                0 :
+                IntSerDeUtils
+                        .getInt(buffer.array(), getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
     }
 
     @Override
     public int getFieldEndOffset(int tupleIndex, int fIdx) {
-        return IntSerDeUtils.getInt(buffer.array(), getTupleStartOffset(tupleIndex) + fIdx * 4);
+        return IntSerDeUtils.getInt(buffer.array(), getTupleStartOffset(tupleIndex) + fIdx * FrameConstants.SIZE_LEN);
     }
 
     @Override
@@ -88,34 +104,57 @@ public final class FrameTupleAccessor implements IFrameTupleAccessor {
     }
 
     @Override
+    public int getTupleLength(int tupleIndex) {
+        return getTupleEndOffset(tupleIndex) - getTupleStartOffset(tupleIndex);
+    }
+
+    @Override
     public int getFieldSlotsLength() {
-        return getFieldCount() * 4;
+        return getFieldCount() * FrameConstants.SIZE_LEN;
     }
 
-    public void prettyPrint() {
+    public void prettyPrint(String prefix) {
         ByteBufferInputStream bbis = new ByteBufferInputStream();
         DataInputStream dis = new DataInputStream(bbis);
         int tc = getTupleCount();
-        System.err.println("TC: " + tc);
+        StringBuilder sb = new StringBuilder();
+        sb.append(prefix).append("TC: " + tc).append("\n");
         for (int i = 0; i < tc; ++i) {
-            System.err.print(i + ":(" + getTupleStartOffset(i) + ", " + getTupleEndOffset(i) + ")[");
-            for (int j = 0; j < getFieldCount(); ++j) {
-                System.err.print(j + ":(" + getFieldStartOffset(i, j) + ", " + getFieldEndOffset(i, j) + ") ");
-                System.err.print("{");
-                bbis.setByteBuffer(buffer, getTupleStartOffset(i) + getFieldSlotsLength() + getFieldStartOffset(i, j));
-                try {
-                    System.err.print(recordDescriptor.getFields()[j].deserialize(dis));
-                } catch (HyracksDataException e) {
-                    e.printStackTrace();
-                }
-                System.err.print("}");
+            prettyPrint(i, bbis, dis, sb);
+        }
+        System.err.println(sb.toString());
+    }
+
+    public void prettyPrint() {
+        prettyPrint("");
+    }
+
+    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
+        sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
+        for (int j = 0; j < getFieldCount(); ++j) {
+            sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
+            sb.append("{");
+            bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
+            try {
+                sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
             }
-            System.err.println("]");
+            sb.append("}");
         }
+        sb.append("\n");
+    }
+
+    public void prettyPrint(int tid) {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        StringBuilder sb = new StringBuilder();
+        prettyPrint(tid, bbis, dis, sb);
+        System.err.println(sb.toString());
     }
 
     @Override
     public int getFieldCount() {
         return recordDescriptor.getFieldCount();
     }
-}
\ No newline at end of file
+}


Mime
View raw message