asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [10/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
index e695828..d0a1146 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -14,11 +14,6 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group.sort;
 
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -31,20 +26,15 @@ import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
-import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
-import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterQuickSort;
-import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
 
 /**
  * Group-by aggregation is pushed before run file generation.
- * 
+ *
  * @author yingyib
  */
-public class ExternalSortGroupByRunGenerator implements IFrameWriter {
-    private final IHyracksTaskContext ctx;
-    private final IFrameSorter frameSorter;
-    private final List<IFrameReader> runs;
-    private final int maxSortFrames;
+public class ExternalSortGroupByRunGenerator extends ExternalSortRunGenerator {
 
     private final int[] groupFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
@@ -52,86 +42,44 @@ public class ExternalSortGroupByRunGenerator implements IFrameWriter {
     private final RecordDescriptor inRecordDesc;
     private final RecordDescriptor outRecordDesc;
 
-    public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor recordDesc,
+    public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc,
             int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException {
-        this.ctx = ctx;
-        if (alg == Algorithm.MERGE_SORT) {
-            frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
-                    recordDesc);
-        } else {
-            frameSorter = new FrameSorterQuickSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
-                    recordDesc);
-        }
-        this.runs = new LinkedList<IFrameReader>();
-        this.maxSortFrames = framesLimit - 1;
+        this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, firstKeyNormalizerFactory, comparatorFactories,
+                aggregatorFactory, outRecordDesc, alg, EnumFreeSlotPolicy.LAST_FIT);
+    }
+
+    public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc,
+            int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor outRecordDesc, Algorithm alg, EnumFreeSlotPolicy policy) throws HyracksDataException {
+        super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, inputRecordDesc, alg, policy,
+                framesLimit);
+
         this.groupFields = groupFields;
         this.comparatorFactories = comparatorFactories;
         this.aggregatorFactory = aggregatorFactory;
-        this.inRecordDesc = recordDesc;
+        this.inRecordDesc = inputRecordDesc;
         this.outRecordDesc = outRecordDesc;
     }
 
     @Override
-    public void open() throws HyracksDataException {
-        runs.clear();
-        frameSorter.reset();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        if (frameSorter.getFrameCount() >= maxSortFrames) {
-            flushFramesToRun();
-        }
-        frameSorter.insertFrame(buffer);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (frameSorter.getFrameCount() > 0) {
-            if (runs.size() <= 0) {
-                frameSorter.sortFrames();
-            } else {
-                flushFramesToRun();
-            }
-        }
-    }
-
-    private void flushFramesToRun() throws HyracksDataException {
-        frameSorter.sortFrames();
+    protected RunFileWriter getRunFileWriter() throws HyracksDataException {
         FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
                 ExternalSortGroupByRunGenerator.class.getSimpleName());
-        RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
+        return new RunFileWriter(file, ctx.getIOManager());
+    }
 
+    @Override
+    protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
         //create group-by comparators
         IBinaryComparator[] comparators = new IBinaryComparator[Math
                 .min(groupFields.length, comparatorFactories.length)];
         for (int i = 0; i < comparators.length; i++) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
+        return new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
                 this.inRecordDesc, this.outRecordDesc, writer, true);
-        pgw.open();
-
-        try {
-            frameSorter.flushFrames(pgw);
-        } finally {
-            pgw.close();
-        }
-        frameSorter.reset();
-        runs.add(writer.createReader());
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-    }
-
-    public IFrameSorter getFrameSorter() {
-        return frameSorter;
-    }
-
-    public List<IFrameReader> getRuns() {
-        return runs;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
index 2a580d3..7de400d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -14,12 +14,8 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group.sort;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -27,39 +23,25 @@ import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
-import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
-import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import edu.uci.ics.hyracks.dataflow.std.sort.ISorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 
 /**
  * Group-by aggregation is pushed into multi-pass merge of external sort.
- * 
+ *
  * @author yingyib
  */
-public class ExternalSortGroupByRunMerger {
+public class ExternalSortGroupByRunMerger extends ExternalSortRunMerger {
 
-    private final IHyracksTaskContext ctx;
-    private final List<IFrameReader> runs;
     private final RecordDescriptor inputRecordDesc;
     private final RecordDescriptor partialAggRecordDesc;
     private final RecordDescriptor outRecordDesc;
-    private final int framesLimit;
-    private final IFrameWriter writer;
-    private List<ByteBuffer> inFrames;
-    private ByteBuffer outFrame;
-    private FrameTupleAppender outFrameAppender;
-
-    private final IFrameSorter frameSorter; // Used in External sort, no replacement
-    // selection
 
     private final int[] groupFields;
-    private final INormalizedKeyComputer firstKeyNkc;
-    private final IBinaryComparator[] comparators;
     private final IAggregatorDescriptorFactory mergeAggregatorFactory;
     private final IAggregatorDescriptorFactory partialAggregatorFactory;
     private final boolean localSide;
@@ -68,25 +50,19 @@ public class ExternalSortGroupByRunMerger {
     private final int[] mergeGroupFields;
     private final IBinaryComparator[] groupByComparators;
 
-    // Constructor for external sort, no replacement selection
-    public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
+    public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, ISorter frameSorter, List<RunAndMaxFrameSizePair> runs,
             int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc,
             RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter writer, int[] groupFields,
             INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory aggregatorFactory,
             boolean localStage) {
-        this.ctx = ctx;
-        this.frameSorter = frameSorter;
-        this.runs = new LinkedList<IFrameReader>(runs);
+        super(ctx, frameSorter, runs, sortFields, comparators, nmk, partialAggRecordDesc, framesLimit,
+                writer);
         this.inputRecordDesc = inRecordDesc;
         this.partialAggRecordDesc = partialAggRecordDesc;
         this.outRecordDesc = outRecordDesc;
-        this.framesLimit = framesLimit;
-        this.writer = writer;
 
         this.groupFields = groupFields;
-        this.firstKeyNkc = nmk;
-        this.comparators = comparators;
         this.mergeAggregatorFactory = aggregatorFactory;
         this.partialAggregatorFactory = partialAggregatorFactory;
         this.localSide = localStage;
@@ -112,82 +88,38 @@ public class ExternalSortGroupByRunMerger {
         }
     }
 
-    public void process() throws HyracksDataException {
+    @Override
+    protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
         IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
-        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, groupByComparators,
-                aggregatorFactory, inputRecordDesc, outRecordDesc, writer, false);
-        try {
-            if (runs.size() <= 0) {
-                pgw.open();
-                if (frameSorter != null && frameSorter.getFrameCount() > 0) {
-                    frameSorter.flushFrames(pgw);
-                }
-                /** recycle sort buffer */
-                frameSorter.close();
-            } else {
-                /** recycle sort buffer */
-                frameSorter.close();
-
-                inFrames = new ArrayList<ByteBuffer>();
-                outFrame = ctx.allocateFrame();
-                outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-                outFrameAppender.reset(outFrame, true);
-                for (int i = 0; i < framesLimit - 1; ++i) {
-                    inFrames.add(ctx.allocateFrame());
-                }
-                int maxMergeWidth = framesLimit - 1;
-                while (runs.size() > maxMergeWidth) {
-                    int generationSeparator = 0;
-                    while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
-                        int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
-                                runs.size() - maxMergeWidth + 1);
-                        FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortGroupByRunMerger.class
-                                .getSimpleName());
-                        IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
-
-                        aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
-                        pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregatorFactory,
-                                partialAggRecordDesc, partialAggRecordDesc, mergeResultWriter, true);
-                        pgw.open();
-
-                        IFrameReader[] runCursors = new RunFileReader[mergeWidth];
-                        for (int i = 0; i < mergeWidth; i++) {
-                            runCursors[i] = runs.get(generationSeparator + i);
-                        }
-                        merge(pgw, runCursors);
-                        pgw.close();
-                        runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
-                        runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
-                    }
-                }
-                if (!runs.isEmpty()) {
-                    pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators,
-                            mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, writer, false);
-                    pgw.open();
-                    IFrameReader[] runCursors = new RunFileReader[runs.size()];
-                    for (int i = 0; i < runCursors.length; i++) {
-                        runCursors[i] = runs.get(i);
-                    }
-                    merge(pgw, runCursors);
-                }
-            }
-        } catch (Exception e) {
-            pgw.fail();
-        } finally {
-            pgw.close();
-        }
+        boolean outputPartial = false;
+        return new PreclusteredGroupWriter(ctx, groupFields, groupByComparators,
+                aggregatorFactory, inputRecordDesc, outRecordDesc, nextWriter, outputPartial);
     }
 
-    private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
-        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, mergeSortFields,
-                comparators, firstKeyNkc, partialAggRecordDesc);
-        merger.open();
-        try {
-            while (merger.nextFrame(outFrame)) {
-                FrameUtils.flushFrame(outFrame, mergeResultWriter);
-            }
-        } finally {
-            merger.close();
-        }
+    @Override
+    protected RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException {
+        FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortGroupByRunMerger.class.getSimpleName());
+        return new RunFileWriter(newRun, ctx.getIOManager());
+    }
+
+    @Override
+    protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+            throws HyracksDataException {
+        IAggregatorDescriptorFactory aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
+        boolean outputPartial = true;
+        return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregatorFactory,
+                partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter, outputPartial);
+    }
+
+    @Override
+    protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+        boolean outputPartial = false;
+        return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators,
+                mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, nextWriter, outputPartial);
+    }
+
+    @Override
+    protected int[] getSortFields() {
+        return mergeSortFields;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index cee105b..95cfb09 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -14,18 +14,11 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group.sort;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
@@ -34,34 +27,24 @@ import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
-import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import edu.uci.ics.hyracks.dataflow.std.sort.ISorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 
 /**
  * This Operator pushes group-by aggregation into the external sort.
  * After the in-memory sort, it aggregates the sorted data before writing it to a run file.
  * During the merge phase, it does an aggregation over sorted results.
- * 
+ *
  * @author yingyib
  */
-public class SortGroupByOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    private static final int SORT_ACTIVITY_ID = 0;
-    private static final int MERGE_ACTIVITY_ID = 1;
+public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescriptor {
 
-    private final int framesLimit;
-    private final int[] sortFields;
     private final int[] groupFields;
-    private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-    private final IBinaryComparatorFactory[] comparatorFactories;
     private final IAggregatorDescriptorFactory mergeAggregatorFactory;
     private final IAggregatorDescriptorFactory partialAggregatorFactory;
     private final RecordDescriptor partialAggRecordDesc;
@@ -69,46 +52,31 @@ public class SortGroupByOperatorDescriptor extends AbstractOperatorDescriptor {
     private final boolean finalStage;
     private Algorithm alg = Algorithm.MERGE_SORT;
 
-    /***
-     * @param spec
-     *            , the Hyracks job specification
-     * @param framesLimit
-     *            , the frame limit for this operator
-     * @param sortFields
-     *            , the fields to sort
-     * @param groupFields
-     *            , the fields to group, which can be a prefix subset of sortFields
-     * @param firstKeyNormalizerFactory
-     *            , the normalized key computer factory of the first key
-     * @param comparatorFactories
-     *            , the comparator factories of sort keys
-     * @param partialAggregatorFactory
-     *            , for aggregating the input of this operator
-     * @param mergeAggregatorFactory
-     *            , for aggregating the intermediate data of this operator
-     * @param partialAggRecordDesc
-     *            , the record descriptor of intermediate data
-     * @param outRecordDesc
-     *            , the record descriptor of output data
-     * @param finalStage
-     *            , whether the operator is used for final stage aggregation
+    /**
+     * @param spec                      , the Hyracks job specification
+     * @param framesLimit               , the frame limit for this operator
+     * @param sortFields                , the fields to sort
+     * @param groupFields               , the fields to group, which can be a prefix subset of sortFields
+     * @param firstKeyNormalizerFactory , the normalized key computer factory of the first key
+     * @param comparatorFactories       , the comparator factories of sort keys
+     * @param partialAggregatorFactory  , for aggregating the input of this operator
+     * @param mergeAggregatorFactory    , for aggregating the intermediate data of this operator
+     * @param partialAggRecordDesc      , the record descriptor of intermediate data
+     * @param outRecordDesc             , the record descriptor of output data
+     * @param finalStage                , whether the operator is used for final stage aggregation
      */
     public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
             IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
             RecordDescriptor outRecordDesc, boolean finalStage) {
-        super(spec, 1, 1);
-        this.framesLimit = framesLimit;
-        this.sortFields = sortFields;
+
+        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, outRecordDesc);
         if (framesLimit <= 1) {
             throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
         }
-        this.recordDescriptors[0] = outRecordDesc;
 
         this.groupFields = groupFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
-        this.comparatorFactories = comparatorFactories;
         this.mergeAggregatorFactory = mergeAggregatorFactory;
         this.partialAggregatorFactory = partialAggregatorFactory;
         this.partialAggRecordDesc = partialAggRecordDesc;
@@ -116,31 +84,19 @@ public class SortGroupByOperatorDescriptor extends AbstractOperatorDescriptor {
         this.finalStage = finalStage;
     }
 
-    /***
-     * @param spec
-     *            , the Hyracks job specification
-     * @param framesLimit
-     *            , the frame limit for this operator
-     * @param sortFields
-     *            , the fields to sort
-     * @param groupFields
-     *            , the fields to group, which can be a prefix subset of sortFields
-     * @param firstKeyNormalizerFactory
-     *            , the normalized key computer factory of the first key
-     * @param comparatorFactories
-     *            , the comparator factories of sort keys
-     * @param partialAggregatorFactory
-     *            , for aggregating the input of this operator
-     * @param mergeAggregatorFactory
-     *            , for aggregating the intermediate data of this operator
-     * @param partialAggRecordDesc
-     *            , the record descriptor of intermediate data
-     * @param outRecordDesc
-     *            , the record descriptor of output data
-     * @param finalStage
-     *            , whether the operator is used for final stage aggregation
-     * @param alg
-     *            , the in-memory sort algorithm
+    /**
+     * @param spec                      , the Hyracks job specification
+     * @param framesLimit               , the frame limit for this operator
+     * @param sortFields                , the fields to sort
+     * @param groupFields               , the fields to group, which can be a prefix subset of sortFields
+     * @param firstKeyNormalizerFactory , the normalized key computer factory of the first key
+     * @param comparatorFactories       , the comparator factories of sort keys
+     * @param partialAggregatorFactory  , for aggregating the input of this operator
+     * @param mergeAggregatorFactory    , for aggregating the intermediate data of this operator
+     * @param partialAggRecordDesc      , the record descriptor of intermediate data
+     * @param outRecordDesc             , the record descriptor of output data
+     * @param finalStage                , whether the operator is used for final stage aggregation
+     * @param alg                       , the in-memory sort algorithm
      */
     public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
@@ -153,122 +109,33 @@ public class SortGroupByOperatorDescriptor extends AbstractOperatorDescriptor {
     }
 
     @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
-        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
-        builder.addActivity(this, sa);
-        builder.addSourceEdge(0, sa, 0);
-
-        builder.addActivity(this, ma);
-        builder.addTargetEdge(0, ma, 0);
-
-        builder.addBlockingEdge(sa, ma);
-    }
-
-    public static class SortTaskState extends AbstractStateObject {
-        private List<IFrameReader> runs;
-        private IFrameSorter frameSorter;
-
-        public SortTaskState() {
-        }
-
-        private SortTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
+    public AbstractSorterOperatorDescriptor.SortActivity getSortActivity(ActivityId id) {
+        return new AbstractSorterOperatorDescriptor.SortActivity(id) {
+            @Override
+            protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
+                return new ExternalSortGroupByRunGenerator(ctx, sortFields,
+                        recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
+                        groupFields, firstKeyNormalizerFactory, comparatorFactories, partialAggregatorFactory,
+                        partialAggRecordDesc, alg);
+            }
+        };
     }
 
-    private class SortActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public SortActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private ExternalSortGroupByRunGenerator runGen;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
-                            recordDescProvider.getInputRecordDescriptor(SortActivity.this.getActivityId(), 0),
-                            framesLimit, groupFields, firstKeyNormalizerFactory, comparatorFactories,
-                            partialAggregatorFactory, partialAggRecordDesc, alg);
-                    runGen.open();
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    runGen.nextFrame(buffer);
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
-                            getActivityId(), partition));
-                    runGen.close();
-                    state.runs = runGen.getRuns();
-                    state.frameSorter = runGen.getFrameSorter();
-                    ctx.setStateObject(state);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    runGen.fail();
-                }
-            };
-            return op;
-        }
-    }
-
-    private class MergeActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public MergeActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                @Override
-                public void initialize() throws HyracksDataException {
-                    SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            SORT_ACTIVITY_ID), partition));
-                    List<IFrameReader> runs = state.runs;
-                    IFrameSorter frameSorter = state.frameSorter;
-                    int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
-
-                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-                    for (int i = 0; i < comparators.length; i++) {
-                        comparators[i] = comparatorFactories[i].createBinaryComparator();
-                    }
-                    INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
-                            .createNormalizedKeyComputer();
-
-                    ExternalSortGroupByRunMerger merger = new ExternalSortGroupByRunMerger(ctx, frameSorter, runs,
-                            sortFields, recordDescProvider.getInputRecordDescriptor(new ActivityId(odId,
-                                    SORT_ACTIVITY_ID), 0), partialAggRecordDesc, outputRecordDesc, necessaryFrames,
-                            writer, groupFields, nkc, comparators, partialAggregatorFactory, mergeAggregatorFactory,
-                            !finalStage);
-                    merger.process();
-                }
-            };
-            return op;
-        }
+    @Override
+    public AbstractSorterOperatorDescriptor.MergeActivity getMergeActivity(ActivityId id) {
+        return new AbstractSorterOperatorDescriptor.MergeActivity(id) {
+
+            @Override
+            protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
+                    List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators,
+                    INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+                return new ExternalSortGroupByRunMerger(ctx, sorter, runs, sortFields,
+                        recordDescProvider.getInputRecordDescriptor(new ActivityId(odId, SORT_ACTIVITY_ID), 0),
+                        partialAggRecordDesc, outputRecordDesc, necessaryFrames, writer, groupFields, nmkComputer,
+                        comparators, partialAggregatorFactory, mergeAggregatorFactory, !finalStage);
+            }
+        };
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index 8375521..9c7fcf4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -16,6 +16,8 @@ package edu.uci.ics.hyracks.dataflow.std.join;
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -103,7 +105,7 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato
 
         try {
 
-            ByteBuffer buffer = ctx.allocateFrame();// input
+            IFrame buffer = new VSizeFrame(ctx);
             // buffer
             int tableSize = (int) (numPartitions * recordsPerFrame * factor);
             ISerializableTable table = new SerializableHashTable(tableSize, ctx);
@@ -115,19 +117,19 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato
                     continue;
                 }
                 table.reset();
-                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
-                        ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpcRep0,
+                        new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators),
+                        isLeftOuter, nullWriters1, table, predEvaluator);
 
                 // build
                 if (buildWriter != null) {
                     RunFileReader buildReader = buildWriter.createReader();
                     buildReader.open();
                     while (buildReader.nextFrame(buffer)) {
-                        ByteBuffer copyBuffer = ctx.allocateFrame();
-                        FrameUtils.copy(buffer, copyBuffer);
+                        ByteBuffer copyBuffer = ctx.allocateFrame(buffer.getFrameSize());
+                        FrameUtils.copyAndFlip(buffer.getBuffer(), copyBuffer);
                         joiner.build(copyBuffer);
-                        buffer.clear();
+                        buffer.reset();
                     }
                     buildReader.close();
                 }
@@ -136,8 +138,8 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato
                 RunFileReader probeReader = probeWriter.createReader();
                 probeReader.open();
                 while (probeReader.nextFrame(buffer)) {
-                    joiner.join(buffer, writer);
-                    buffer.clear();
+                    joiner.join(buffer.getBuffer(), writer);
+                    buffer.reset();
                 }
                 probeReader.close();
                 joiner.closeJoin(writer);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
index c6dbe61..70f28da 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
@@ -16,6 +16,8 @@ package edu.uci.ics.hyracks.dataflow.std.join;
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -39,7 +41,7 @@ class GraceHashJoinPartitionBuildOperatorNodePushable extends
     private final FrameTupleAccessor accessor0;
     private final ITuplePartitionComputer hpc;
     private final FrameTupleAppender appender;
-    private ByteBuffer[] outbufs;
+    private IFrame[] outbufs;
     private GraceHashJoinPartitionState state;
 
     GraceHashJoinPartitionBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys,
@@ -48,8 +50,8 @@ class GraceHashJoinPartitionBuildOperatorNodePushable extends
         this.ctx = ctx;
         this.stateId = stateId;
         this.numPartitions = numPartitions;
-        accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
-        appender = new FrameTupleAppender(ctx.getFrameSize());
+        accessor0 = new FrameTupleAccessor(inRecordDescriptor);
+        appender = new FrameTupleAppender();
         hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories).createPartitioner();
         comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
@@ -60,7 +62,7 @@ class GraceHashJoinPartitionBuildOperatorNodePushable extends
     @Override
     public void close() throws HyracksDataException {
         for (int i = 0; i < numPartitions; i++) {
-            ByteBuffer head = outbufs[i];
+            ByteBuffer head = outbufs[i].getBuffer();
             accessor0.reset(head);
             if (accessor0.getTupleCount() > 0) {
                 write(i, head);
@@ -97,13 +99,13 @@ class GraceHashJoinPartitionBuildOperatorNodePushable extends
         for (int i = 0; i < tCount; ++i) {
 
             int entry = hpc.partition(accessor0, i, numPartitions);
-            ByteBuffer outbuf = outbufs[entry];
+            IFrame outbuf = outbufs[entry];
             appender.reset(outbuf, false);
             if (!appender.append(accessor0, i)) {
                 // buffer is full, ie. we cannot fit the tuple
                 // into the buffer -- write it to disk
-                write(entry, outbuf);
-                outbuf.clear();
+                write(entry, outbuf.getBuffer());
+                outbuf.reset();
                 appender.reset(outbuf, true);
                 if (!appender.append(accessor0, i)) {
                     throw new HyracksDataException("Item too big to fit in frame");
@@ -115,10 +117,10 @@ class GraceHashJoinPartitionBuildOperatorNodePushable extends
     @Override
     public void open() throws HyracksDataException {
         state = new GraceHashJoinPartitionState(ctx.getJobletContext().getJobId(), stateId);
-        outbufs = new ByteBuffer[numPartitions];
+        outbufs = new IFrame[numPartitions];
         state.setRunWriters(new RunFileWriter[numPartitions]);
         for (int i = 0; i < numPartitions; i++) {
-            outbufs[i] = ctx.allocateFrame();
+            outbufs[i] = new VSizeFrame(ctx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 910edc7..0915ff9 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -19,6 +19,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -74,10 +76,8 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
     /**
      * @param spec
-     * @param memsize
-     *            in frames
-     * @param inputsize0
-     *            in frames
+     * @param memsize               in frames
+     * @param inputsize0            in frames
      * @param recordsPerFrame
      * @param factor
      * @param keys0
@@ -201,21 +201,21 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
                         .getJobId(), new TaskId(getActivityId(), partition));
-                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
                 private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories).createPartitioner();
-                private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
-                private ByteBuffer[] bufferForPartitions;
-                private final ByteBuffer inBuffer = ctx.allocateFrame();
+                private final FrameTupleAppender appender = new FrameTupleAppender();
+                private final FrameTupleAppender ftappender = new FrameTupleAppender();
+                private IFrame[] bufferForPartitions;
+                private final IFrame inBuffer = new VSizeFrame(ctx);
 
                 @Override
                 public void close() throws HyracksDataException {
                     if (state.memoryForHashtable != 0)
-                        build(inBuffer);
+                        build(inBuffer.getBuffer());
 
                     for (int i = 0; i < state.nPartitions; i++) {
-                        ByteBuffer buf = bufferForPartitions[i];
+                        ByteBuffer buf = bufferForPartitions[i].getBuffer();
                         accessorBuild.reset(buf);
                         if (accessorBuild.getTupleCount() > 0) {
                             write(i, buf);
@@ -233,18 +233,18 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                         accessorBuild.reset(buffer);
                         int tCount = accessorBuild.getTupleCount();
                         for (int i = 0; i < tCount; ++i) {
-                            int entry = -1;
+                            int entry;
                             if (state.memoryForHashtable == 0) {
                                 entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
                                 boolean newBuffer = false;
-                                ByteBuffer bufBi = bufferForPartitions[entry];
+                                IFrame bufBi = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(bufBi, newBuffer);
                                     if (appender.append(accessorBuild, i)) {
                                         break;
                                     } else {
-                                        write(entry, bufBi);
-                                        bufBi.clear();
+                                        write(entry, bufBi.getBuffer());
+                                        bufBi.reset();
                                         newBuffer = true;
                                     }
                                 }
@@ -253,7 +253,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                                 if (entry < state.memoryForHashtable) {
                                     while (true) {
                                         if (!ftappender.append(accessorBuild, i)) {
-                                            build(inBuffer);
+                                            build(inBuffer.getBuffer());
 
                                             ftappender.reset(inBuffer, true);
                                         } else {
@@ -263,14 +263,14 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                                 } else {
                                     entry %= state.nPartitions;
                                     boolean newBuffer = false;
-                                    ByteBuffer bufBi = bufferForPartitions[entry];
+                                    IFrame bufBi = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(bufBi, newBuffer);
                                         if (appender.append(accessorBuild, i)) {
                                             break;
                                         } else {
-                                            write(entry, bufBi);
-                                            bufBi.clear();
+                                            write(entry, bufBi.getBuffer());
+                                            bufBi.reset();
                                             newBuffer = true;
                                         }
                                     }
@@ -285,8 +285,8 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                 }
 
                 private void build(ByteBuffer inBuffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame();
-                    FrameUtils.copy(inBuffer, copyBuffer);
+                    ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.capacity());
+                    FrameUtils.copyAndFlip(inBuffer, copyBuffer);
                     state.joiner.build(copyBuffer);
                 }
 
@@ -321,13 +321,13 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
                     ISerializableTable table = new SerializableHashTable(tableSize, ctx);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
-                            new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
-                                    ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1, table, predEvaluator);
-                    bufferForPartitions = new ByteBuffer[state.nPartitions];
+                            new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), hpc1,
+                            new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table,
+                            predEvaluator);
+                    bufferForPartitions = new IFrame[state.nPartitions];
                     state.fWriters = new RunFileWriter[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = ctx.allocateFrame();
+                        bufferForPartitions[i] = new VSizeFrame(ctx);
                     }
 
                     ftappender.reset(inBuffer, true);
@@ -391,20 +391,20 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
-                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(rd0);
                 private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
                         hashFunctionFactories);
                 private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories);
                 private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
 
-                private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
-                private final ByteBuffer inBuffer = ctx.allocateFrame();
-                private final ByteBuffer outBuffer = ctx.allocateFrame();
+                private final FrameTupleAppender appender = new FrameTupleAppender();
+                private final FrameTupleAppender ftap = new FrameTupleAppender();
+                private final IFrame inBuffer = new VSizeFrame(ctx);
+                private final IFrame outBuffer = new VSizeFrame(ctx);
                 private RunFileWriter[] buildWriters;
                 private RunFileWriter[] probeWriters;
-                private ByteBuffer[] bufferForPartitions;
+                private IFrame[] bufferForPartitions;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -413,9 +413,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     writer.open();
                     buildWriters = state.fWriters;
                     probeWriters = new RunFileWriter[state.nPartitions];
-                    bufferForPartitions = new ByteBuffer[state.nPartitions];
+                    bufferForPartitions = new IFrame[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = ctx.allocateFrame();
+                        bufferForPartitions[i] = new VSizeFrame(ctx);
                     }
                     appender.reset(outBuffer, true);
                     ftap.reset(inBuffer, true);
@@ -428,18 +428,18 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                         int tupleCount0 = accessorProbe.getTupleCount();
                         for (int i = 0; i < tupleCount0; ++i) {
 
-                            int entry = -1;
+                            int entry ;
                             if (state.memoryForHashtable == 0) {
                                 entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
                                 boolean newBuffer = false;
-                                ByteBuffer outbuf = bufferForPartitions[entry];
+                                IFrame outbuf = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(outbuf, newBuffer);
                                     if (appender.append(accessorProbe, i)) {
                                         break;
                                     } else {
-                                        write(entry, outbuf);
-                                        outbuf.clear();
+                                        write(entry, outbuf.getBuffer());
+                                        outbuf.reset();
                                         newBuffer = true;
                                     }
                                 }
@@ -448,7 +448,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                                 if (entry < state.memoryForHashtable) {
                                     while (true) {
                                         if (!ftap.append(accessorProbe, i)) {
-                                            state.joiner.join(inBuffer, writer);
+                                            state.joiner.join(inBuffer.getBuffer(), writer);
                                             ftap.reset(inBuffer, true);
                                         } else
                                             break;
@@ -457,14 +457,14 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                                 } else {
                                     entry %= state.nPartitions;
                                     boolean newBuffer = false;
-                                    ByteBuffer outbuf = bufferForPartitions[entry];
+                                    IFrame outbuf = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(outbuf, newBuffer);
                                         if (appender.append(accessorProbe, i)) {
                                             break;
                                         } else {
-                                            write(entry, outbuf);
-                                            outbuf.clear();
+                                            write(entry, outbuf.getBuffer());
+                                            outbuf.reset();
                                             newBuffer = true;
                                         }
                                     }
@@ -478,7 +478,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.joiner.join(inBuffer, writer);
+                    state.joiner.join(inBuffer.getBuffer(), writer);
                     state.joiner.closeJoin(writer);
                     ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
                             .createPartitioner();
@@ -486,7 +486,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                             .createPartitioner();
                     if (state.memoryForHashtable != memsize - 2) {
                         for (int i = 0; i < state.nPartitions; i++) {
-                            ByteBuffer buf = bufferForPartitions[i];
+                            ByteBuffer buf = bufferForPartitions[i].getBuffer();
                             accessorProbe.reset(buf);
                             if (accessorProbe.getTupleCount() > 0) {
                                 write(i, buf);
@@ -494,7 +494,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                             closeWriter(i);
                         }
 
-                        inBuffer.clear();
+                        inBuffer.reset();
                         int tableSize = -1;
                         if (state.memoryForHashtable == 0) {
                             tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
@@ -510,18 +510,18 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                             }
                             table.reset();
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
-                                    ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
-                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                    nullWriters1, table, predEvaluator);
+                                    rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
+                                    new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1,
+                                    table, predEvaluator);
 
                             if (buildWriter != null) {
                                 RunFileReader buildReader = buildWriter.createReader();
                                 buildReader.open();
                                 while (buildReader.nextFrame(inBuffer)) {
-                                    ByteBuffer copyBuffer = ctx.allocateFrame();
-                                    FrameUtils.copy(inBuffer, copyBuffer);
+                                    ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
+                                    FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
                                     joiner.build(copyBuffer);
-                                    inBuffer.clear();
+                                    inBuffer.reset();
                                 }
                                 buildReader.close();
                             }
@@ -530,8 +530,8 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                             RunFileReader probeReader = probeWriter.createReader();
                             probeReader.open();
                             while (probeReader.nextFrame(inBuffer)) {
-                                joiner.join(inBuffer, writer);
-                                inBuffer.clear();
+                                joiner.join(inBuffer.getBuffer(), writer);
+                                inBuffer.reset();
                             }
                             probeReader.close();
                             joiner.closeJoin(writer);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 860cdd4..d6a83d2 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
@@ -30,6 +31,7 @@ import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
 
@@ -43,7 +45,6 @@ public class InMemoryHashJoin {
     private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
     private final FrameTuplePairComparator tpComparator;
-    private final ByteBuffer outBuffer;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuild;
     private final ISerializableTable table;
@@ -75,10 +76,8 @@ public class InMemoryHashJoin {
         this.tpcBuild = tpc1;
         this.accessorProbe = accessor0;
         this.tpcProbe = tpc0;
-        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender = new FrameTupleAppender(new VSizeFrame(ctx));
         tpComparator = comparator;
-        outBuffer = ctx.allocateFrame();
-        appender.reset(outBuffer, true);
         predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
@@ -136,24 +135,15 @@ public class InMemoryHashJoin {
                 } while (true);
             }
             if (!matchFound && isLeftOuter) {
-                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
-                        nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
-                    flushFrame(outBuffer, writer);
-                    appender.reset(outBuffer, true);
-                    if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
-                            nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
-                        throw new HyracksDataException("Record size larger than frame size ("
-                                + appender.getBuffer().capacity() + ")");
-                    }
-                }
+                FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, i,
+                        nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
+                        nullTupleBuild.getSize());
             }
         }
     }
 
     public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            flushFrame(outBuffer, writer);
-        }
+        appender.flush(writer, true);
         int nFrames = buffers.size();
         buffers.clear();
         ctx.deallocateFrames(nFrames);
@@ -179,31 +169,11 @@ public class InMemoryHashJoin {
 
     private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
         if (!reverseOutputOrder) {
-            if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
-                flushFrame(outBuffer, writer);
-                appender.reset(outBuffer, true);
-                if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
-                    int tSize = accessorProbe.getTupleEndOffset(probeSidetIx)
-                            - accessorProbe.getTupleStartOffset(probeSidetIx)
-                            + accessorBuild.getTupleEndOffset(buildSidetIx)
-                            - accessorBuild.getTupleStartOffset(buildSidetIx);
-                    throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
+            FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, probeSidetIx, accessorBuild,
+                    buildSidetIx);
         } else {
-            if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
-                flushFrame(outBuffer, writer);
-                appender.reset(outBuffer, true);
-                if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
-                    int tSize = accessorProbe.getTupleEndOffset(probeSidetIx)
-                            - accessorProbe.getTupleStartOffset(probeSidetIx)
-                            + accessorBuild.getTupleEndOffset(buildSidetIx)
-                            - accessorBuild.getTupleStartOffset(buildSidetIx);
-                    throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
+            FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe,
+                    probeSidetIx);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index be369d2..35e22ad 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -77,7 +77,8 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1,
             int tableSize) {
         super(spec, 2, 1);
         this.keys0 = keys0;
@@ -90,21 +91,20 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
         this.nullWriterFactories1 = nullWriterFactories1;
         this.tableSize = tableSize;
     }
-    
+
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, int tableSize) {
-    	this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null);
+        this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null);
     }
-    
+
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
             int tableSize) {
-    	this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories,null,recordDescriptor,isLeftOuter,nullWriterFactories1,tableSize);
+        this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, null, recordDescriptor, isLeftOuter,
+                nullWriterFactories1, tableSize);
     }
-    
-    
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
@@ -170,7 +170,9 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ?
+                    null :
+                    predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildTaskState state;
@@ -185,15 +187,15 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
                             partition));
                     ISerializableTable table = new SerializableHashTable(tableSize, ctx);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
-                            new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
-                                    ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
+                            new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), hpc1,
+                            new FrameTuplePairComparator(keys0, keys1,
                                     comparators), isLeftOuter, nullWriters1, table, predEvaluator);
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame();
-                    FrameUtils.copy(buffer, copyBuffer);
+                    ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+                    FrameUtils.copyAndFlip(buffer, copyBuffer);
                     state.joiner.build(copyBuffer);
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index eab60bc..3bd0540 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -19,7 +19,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
@@ -38,8 +40,8 @@ public class NestedLoopJoin {
     private final FrameTupleAccessor accessorOuter;
     private final FrameTupleAppender appender;
     private final ITuplePairComparator tpComparator;
-    private final ByteBuffer outBuffer;
-    private final ByteBuffer innerBuffer;
+    private final IFrame outBuffer;
+    private final IFrame innerBuffer;
     private final List<ByteBuffer> outBuffers;
     private final int memSize;
     private final IHyracksTaskContext ctx;
@@ -49,18 +51,18 @@ public class NestedLoopJoin {
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuilder;
     private final IPredicateEvaluator predEvaluator;
-    private boolean isReversed;		//Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+    private boolean isReversed;        //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
 
-    
     public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
-            ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, INullWriter[] nullWriters1)
+            ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
+            INullWriter[] nullWriters1)
             throws HyracksDataException {
         this.accessorInner = accessor1;
         this.accessorOuter = accessor0;
-        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.appender = new FrameTupleAppender();
         this.tpComparator = comparators;
-        this.outBuffer = ctx.allocateFrame();
-        this.innerBuffer = ctx.allocateFrame();
+        this.outBuffer = new VSizeFrame(ctx);
+        this.innerBuffer = new VSizeFrame(ctx);
         this.appender.reset(outBuffer, true);
         this.outBuffers = new ArrayList<ByteBuffer>();
         this.memSize = memSize;
@@ -107,7 +109,7 @@ public class NestedLoopJoin {
         runFileReader.open();
         while (runFileReader.nextFrame(innerBuffer)) {
             for (ByteBuffer outBuffer : outBuffers) {
-                blockJoin(outBuffer, innerBuffer, writer);
+                blockJoin(outBuffer, innerBuffer.getBuffer(), writer);
             }
         }
         runFileReader.close();
@@ -116,15 +118,18 @@ public class NestedLoopJoin {
     }
 
     private void createAndCopyFrame(ByteBuffer outerBuffer) throws HyracksDataException {
-        ByteBuffer outerBufferCopy = ctx.allocateFrame();
-        FrameUtils.copy(outerBuffer, outerBufferCopy);
+        ByteBuffer outerBufferCopy = ctx.allocateFrame(outerBuffer.capacity());
+        FrameUtils.copyAndFlip(outerBuffer, outerBufferCopy);
         outBuffers.add(outerBufferCopy);
         currentMemSize++;
     }
 
-    private void reloadFrame(ByteBuffer outerBuffer) {
+    private void reloadFrame(ByteBuffer outerBuffer) throws HyracksDataException {
         outBuffers.get(currentMemSize).clear();
-        FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+        if (outBuffers.get(currentMemSize).capacity() != outerBuffer.capacity()) {
+            outBuffers.set(currentMemSize, ctx.allocateFrame(outerBuffer.capacity()));
+        }
+        FrameUtils.copyAndFlip(outerBuffer, outBuffers.get(currentMemSize));
         currentMemSize++;
     }
 
@@ -141,8 +146,8 @@ public class NestedLoopJoin {
                 int c = compare(accessorOuter, i, accessorInner, j);
                 boolean prdEval = evaluatePredicate(i, j);
                 if (c == 0 && prdEval) {
-                	matchFound = true;
-                	appendToResults(i, j, writer);
+                    matchFound = true;
+                    appendToResults(i, j, writer);
                 }
             }
 
@@ -150,28 +155,20 @@ public class NestedLoopJoin {
                 final int[] ntFieldEndOffsets = nullTupleBuilder.getFieldEndOffsets();
                 final byte[] ntByteArray = nullTupleBuilder.getByteArray();
                 final int ntSize = nullTupleBuilder.getSize();
-                if (!appender.appendConcat(accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0, ntSize)) {
-                    flushFrame(outBuffer, writer);
-                    appender.reset(outBuffer, true);
-                    if (!appender.appendConcat(accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0, ntSize)) {
-                        int tSize = accessorOuter.getTupleEndOffset(i) - accessorOuter.getTupleStartOffset(i) + ntSize;
-                        throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
-                                + appender.getBuffer().capacity() + ")");
-                    }
-                }
+                FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0,
+                        ntSize);
             }
         }
     }
-    
-    private boolean evaluatePredicate(int tIx1, int tIx2){
-    	if(isReversed){		//Role Reversal Optimization is triggered
-    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1) );
-    	}
-    	else {
-    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
-    	}
+
+    private boolean evaluatePredicate(int tIx1, int tIx2) {
+        if (isReversed) {        //Role Reversal Optimization is triggered
+            return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
+        } else {
+            return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
+        }
     }
-    
+
     private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException {
         if (!isReversed) {
             appendResultToFrame(accessorOuter, outerTupleId, accessorInner, innerTupleId, writer);
@@ -183,18 +180,9 @@ public class NestedLoopJoin {
 
     private void appendResultToFrame(FrameTupleAccessor accessor1, int tupleId1, FrameTupleAccessor accessor2,
             int tupleId2, IFrameWriter writer) throws HyracksDataException {
-        if (!appender.appendConcat(accessor1, tupleId1, accessor2, tupleId2)) {
-            flushFrame(outBuffer, writer);
-            appender.reset(outBuffer, true);
-            if (!appender.appendConcat(accessor1, tupleId1, accessor2, tupleId2)) {
-                int tSize = accessor1.getTupleEndOffset(tupleId1) - accessor1.getTupleStartOffset(tupleId1)
-                        + accessor2.getTupleEndOffset(tupleId2) - accessor2.getTupleStartOffset(tupleId2);
-                throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
-                        + appender.getBuffer().capacity() + ")");
-            }
-        }
+        FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2, tupleId2);
     }
-    
+
     public void closeCache() throws HyracksDataException {
         if (runFileWriter != null) {
             runFileWriter.close();
@@ -206,24 +194,14 @@ public class NestedLoopJoin {
         runFileReader.open();
         while (runFileReader.nextFrame(innerBuffer)) {
             for (int i = 0; i < currentMemSize; i++) {
-                blockJoin(outBuffers.get(i), innerBuffer, writer);
+                blockJoin(outBuffers.get(i), innerBuffer.getBuffer(), writer);
             }
         }
         runFileReader.close();
         outBuffers.clear();
         currentMemSize = 0;
 
-        if (appender.getTupleCount() > 0) {
-            flushFrame(outBuffer, writer);
-        }
-    }
-
-    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        writer.nextFrame(buffer);
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
+        appender.flush(writer, true);
     }
 
     private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
@@ -234,8 +212,8 @@ public class NestedLoopJoin {
         }
         return 0;
     }
-    
-    public void setIsReversed(boolean b){
-    	this.isReversed = b;
+
+    public void setIsReversed(boolean b) {
+        this.isReversed = b;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 41f0f4f..3e06bf3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -54,16 +54,17 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
     private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
-    
+
     public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
             boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
         this(spec, comparatorFactory, recordDescriptor, memSize, null, isLeftOuter, nullWriterFactories1);
     }
-    
+
     public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
-            IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+            IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) {
         super(spec, 2, 1);
         this.comparatorFactory = comparatorFactory;
         this.recordDescriptors[0] = recordDescriptor;
@@ -127,8 +128,10 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = ( (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null);
-            
+            final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null) ?
+                    predEvaluatorFactory.createPredicateEvaluator() :
+                    null);
+
             final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
                 for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -144,17 +147,16 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
                             partition));
 
-                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
-                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, predEvaluator, isLeftOuter,
+                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0),
+                            new FrameTupleAccessor(rd1), comparator, memSize, predEvaluator, isLeftOuter,
                             nullWriters1);
 
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame();
-                    FrameUtils.copy(buffer, copyBuffer);
-                    FrameUtils.makeReadable(copyBuffer);
+                    ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+                    FrameUtils.copyAndFlip(buffer, copyBuffer);
                     state.joiner.cache(copyBuffer);
                 }
 


Mime
View raw message