asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [07/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 9178094..6b36480 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -1,318 +1,271 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-/**
- * @author pouria This class defines the logic for merging the run, generated
- *         during the first phase of external sort (for both sorting without
- *         replacement selection and with it). For the case with replacement
- *         selection, this code also takes the limit on the output into account
- *         (if specified). If number of input runs is less than the available
- *         memory frames, then merging can be done in one pass, by allocating
- *         one buffer per run, and one buffer as the output buffer. A
- *         priorityQueue is used to find the top tuple at each iteration, among
- *         all the runs' heads in memory (check RunMergingFrameReader for more
- *         details). Otherwise, assuming that we have R runs and M memory
- *         buffers, where (R > M), we first merge first (M-1) runs and create a
- *         new sorted run, out of them. Discarding the first (M-1) runs, now
- *         merging procedure gets applied recursively on the (R-M+2) remaining
- *         runs using the M memory buffers. For the case of replacement
- *         selection, if outputLimit is specified, once the final pass is done
- *         on the runs (which is the pass that generates the final sorted
- *         output), as soon as the output size hits the output limit, the
- *         process stops, closes, and returns.
- */
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
 
 public class ExternalSortRunMerger {
 
-    private final IHyracksTaskContext ctx;
-    private final List<IFrameReader> runs;
+    protected final IHyracksTaskContext ctx;
+    protected final IFrameWriter writer;
+
+    private final List<RunAndMaxFrameSizePair> runs;
+    private final BitSet currentGenerationRunAvailable;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
     private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
     private final int framesLimit;
-    private final IFrameWriter writer;
-    private List<ByteBuffer> inFrames;
-    private ByteBuffer outFrame;
-    private FrameTupleAppender outFrameAppender;
-
-    private IFrameSorter frameSorter; // Used in External sort, no replacement
-                                      // selection
-    private FrameTupleAccessor outFrameAccessor; // Used in External sort, with
-                                                 // replacement selection
-    private final int outputLimit; // Used in External sort, with replacement
-                                   // selection and limit on output size
-    private int currentSize; // Used in External sort, with replacement
-                             // selection and limit on output size
-
-    // Constructor for external sort, no replacement selection
-    public ExternalSortRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
+    private final int MAX_FRAME_SIZE;
+    private final ArrayList<IFrameReader> tempRuns;
+    private final int topK;
+    private List<GroupVSizeFrame> inFrames;
+    private VSizeFrame outputFrame;
+    private ISorter sorter;
+
+    private static final Logger LOGGER = Logger.getLogger(ExternalSortRunMerger.class.getName());
+
+    public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
             int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
             RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
-        this.ctx = ctx;
-        this.frameSorter = frameSorter;
-        this.runs = new LinkedList<IFrameReader>(runs);
-        this.sortFields = sortFields;
-        this.comparators = comparators;
-        this.nmkComputer = nmkComputer;
-        this.recordDesc = recordDesc;
-        this.framesLimit = framesLimit;
-        this.writer = writer;
-        this.outputLimit = -1;
+        this(ctx, sorter, runs, sortFields, comparators, nmkComputer, recordDesc, framesLimit,
+                Integer.MAX_VALUE, writer);
     }
 
-    // Constructor for external sort with replacement selection
-    public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
-            IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
-            int framesLimit, IFrameWriter writer) {
+    public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
+            int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+            RecordDescriptor recordDesc, int framesLimit, int topK, IFrameWriter writer) {
         this.ctx = ctx;
-        this.runs = new LinkedList<IFrameReader>(runs);
+        this.sorter = sorter;
+        this.runs = new LinkedList<>(runs);
+        this.currentGenerationRunAvailable = new BitSet(runs.size());
         this.sortFields = sortFields;
         this.comparators = comparators;
         this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
-        this.outputLimit = outputLimit;
-        this.currentSize = 0;
-        this.frameSorter = null;
+        this.MAX_FRAME_SIZE = FrameConstants.MAX_NUM_MINFRAME * ctx.getInitialFrameSize();
+        this.topK = topK;
+        this.tempRuns = new ArrayList<>(runs.size());
     }
 
     public void process() throws HyracksDataException {
-        writer.open();
+        IFrameWriter finalWriter = null;
         try {
             if (runs.size() <= 0) {
-                if (frameSorter != null && frameSorter.getFrameCount() > 0) {
-                    frameSorter.flushFrames(writer);
+                finalWriter = prepareSkipMergingFinalResultWriter(writer);
+                finalWriter.open();
+                if (sorter != null) {
+                    if (sorter.hasRemaining()) {
+                        sorter.flush(finalWriter);
+                    }
+                    sorter.close();
                 }
-                /** recycle sort buffer */
-                frameSorter.close();
             } else {
                 /** recycle sort buffer */
-                frameSorter.close();
-
-                inFrames = new ArrayList<ByteBuffer>();
-                outFrame = ctx.allocateFrame();
-                outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-                outFrameAppender.reset(outFrame, true);
-                for (int i = 0; i < framesLimit - 1; ++i) {
-                    inFrames.add(ctx.allocateFrame());
+                if (sorter != null) {
+                    sorter.close();
                 }
+
+                finalWriter = prepareFinalMergeResultWriter(writer);
+                finalWriter.open();
+
                 int maxMergeWidth = framesLimit - 1;
-                while (runs.size() > maxMergeWidth) {
-                    int generationSeparator = 0;
-                    while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
-                        int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
-                                runs.size() - maxMergeWidth + 1);
-                        FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
-                                .getSimpleName());
-                        IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
-                        mergeResultWriter.open();
-                        IFrameReader[] runCursors = new RunFileReader[mergeWidth];
-                        for (int i = 0; i < mergeWidth; i++) {
-                            runCursors[i] = runs.get(generationSeparator + i);
+
+                inFrames = new ArrayList<>(maxMergeWidth);
+                outputFrame = new VSizeFrame(ctx);
+                List<RunAndMaxFrameSizePair> partialRuns = new ArrayList<>(maxMergeWidth);
+
+                int stop = runs.size();
+                currentGenerationRunAvailable.set(0, stop);
+
+                while (true) {
+
+                    int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
+                            currentGenerationRunAvailable,
+                            stop);
+                    prepareFrames(unUsed, inFrames, partialRuns);
+
+                    if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
+                        IFrameReader reader;
+                        int mergedMaxFrameSize;
+                        if (partialRuns.size() == 1) {
+                            if (!currentGenerationRunAvailable.isEmpty()) {
+                                throw new HyracksDataException(
+                                        "The record is too big to put into the merging frame, please"
+                                                + " allocate more sorting memory");
+                            } else {
+                                reader = partialRuns.get(0).run;
+                                mergedMaxFrameSize = partialRuns.get(0).maxFrameSize;
+                            }
+
+                        } else {
+                            RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
+                            IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
+
+                            mergeResultWriter.open();
+                            mergedMaxFrameSize = merge(mergeResultWriter, partialRuns);
+                            mergeResultWriter.close();
+
+                            reader = mergeFileWriter.createReader();
                         }
-                        merge(mergeResultWriter, runCursors);
-                        mergeResultWriter.close();
-                        runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
-                        runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
-                    }
-                }
-                if (!runs.isEmpty()) {
-                    IFrameReader[] runCursors = new RunFileReader[runs.size()];
-                    for (int i = 0; i < runCursors.length; i++) {
-                        runCursors[i] = runs.get(i);
+
+                        appendNewRuns(reader, mergedMaxFrameSize);
+                        if (currentGenerationRunAvailable.isEmpty()) {
+
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine("generated runs:" + stop);
+                            }
+                            runs.subList(0, stop).clear();
+                            currentGenerationRunAvailable.clear();
+                            currentGenerationRunAvailable.set(0, runs.size());
+                            stop = runs.size();
+                        }
+                    } else {
+                        if (LOGGER.isLoggable(Level.FINE)) {
+                            LOGGER.fine("final runs:" + stop);
+                        }
+                        merge(finalWriter, partialRuns);
+                        break;
                     }
-                    merge(writer, runCursors);
                 }
             }
         } catch (Exception e) {
-            writer.fail();
+            finalWriter.fail();
             throw new HyracksDataException(e);
         } finally {
-            writer.close();
+            finalWriter.close();
         }
     }
 
-    private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
-        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
-                nmkComputer, recordDesc);
-        merger.open();
-        try {
-            while (merger.nextFrame(outFrame)) {
-                FrameUtils.flushFrame(outFrame, mergeResultWriter);
+    private void appendNewRuns(IFrameReader reader, int mergedPartialMaxSize) {
+        runs.add(new RunAndMaxFrameSizePair(reader, mergedPartialMaxSize));
+    }
+
+    private static int selectPartialRuns(int budget, List<RunAndMaxFrameSizePair> runs,
+            List<RunAndMaxFrameSizePair> partialRuns, BitSet runAvailable, int stop) {
+        partialRuns.clear();
+        int maxFrameSizeOfGenRun = 0;
+        int nextRunId = runAvailable.nextSetBit(0);
+        while (budget > 0 && nextRunId >= 0 && nextRunId < stop) {
+            int runFrameSize = runs.get(nextRunId).maxFrameSize;
+            if (budget - runFrameSize >= 0) {
+                partialRuns.add(runs.get(nextRunId));
+                budget -= runFrameSize;
+                runAvailable.clear(nextRunId);
+                maxFrameSizeOfGenRun = runFrameSize > maxFrameSizeOfGenRun ? runFrameSize : maxFrameSizeOfGenRun;
             }
-        } finally {
-            merger.close();
+            nextRunId = runAvailable.nextSetBit(nextRunId + 1);
         }
+        return budget;
     }
 
-    public void processWithReplacementSelection() throws HyracksDataException {
-        writer.open();
-        try {
-            outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
-            outFrame = ctx.allocateFrame();
-            outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-            outFrameAppender.reset(outFrame, true);
-            if (runs.size() == 1) {
-                if (outputLimit < 1) {
-                    runs.get(0).open();
-                    ByteBuffer nextFrame = ctx.allocateFrame();
-                    while (runs.get(0).nextFrame(nextFrame)) {
-                        FrameUtils.flushFrame(nextFrame, writer);
-                        outFrameAppender.reset(nextFrame, true);
-                    }
-                    return;
-                }
-                // Limit on the output size
-                int totalCount = 0;
-                runs.get(0).open();
-                FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
-                ByteBuffer nextFrame = ctx.allocateFrame();
-                while (totalCount <= outputLimit && runs.get(0).nextFrame(nextFrame)) {
-                    fta.reset(nextFrame);
-                    int tupCount = fta.getTupleCount();
-                    if ((totalCount + tupCount) < outputLimit) {
-                        FrameUtils.flushFrame(nextFrame, writer);
-                        totalCount += tupCount;
-                        continue;
-                    }
-                    // The very last buffer, which exceeds the limit
-                    int copyCount = outputLimit - totalCount;
-                    outFrameAppender.reset(outFrame, true);
-                    for (int i = 0; i < copyCount; i++) {
-                        if (!outFrameAppender.append(fta, i)) {
-                            throw new HyracksDataException("Record size ("
-                                    + (fta.getTupleEndOffset(i) - fta.getTupleStartOffset(i))
-                                    + ") larger than frame size (" + outFrameAppender.getBuffer().capacity() + ")");
-                        }
-                        totalCount++;
-                    }
-                }
-                if (outFrameAppender.getTupleCount() > 0) {
-                    FrameUtils.flushFrame(outFrame, writer);
-                    outFrameAppender.reset(outFrame, true);
-                }
-                return;
+    private void prepareFrames(int extraFreeMem, List<GroupVSizeFrame> inFrames,
+            List<RunAndMaxFrameSizePair> patialRuns)
+            throws HyracksDataException {
+        if (extraFreeMem > 0 && patialRuns.size() > 1) {
+            int extraFrames = extraFreeMem / ctx.getInitialFrameSize();
+            int avg = (extraFrames / patialRuns.size()) * ctx.getInitialFrameSize();
+            int residue = (extraFrames % patialRuns.size());
+            for (int i = 0; i < residue; i++) {
+                patialRuns.get(i).updateSize(
+                        Math.min(MAX_FRAME_SIZE, patialRuns.get(i).maxFrameSize + avg + ctx.getInitialFrameSize()));
             }
-            // More than one run, actual merging is needed
-            inFrames = new ArrayList<ByteBuffer>();
-            for (int i = 0; i < framesLimit - 1; ++i) {
-                inFrames.add(ctx.allocateFrame());
-            }
-            while (runs.size() > 0) {
-                try {
-                    doPassWithReplacementSelection(runs);
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
-                }
+            for (int i = residue; i < patialRuns.size() && avg > 0; i++) {
+                patialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE, patialRuns.get(i).maxFrameSize + avg));
             }
+        }
 
-        } catch (Exception e) {
-            writer.fail();
-            throw new HyracksDataException(e);
-        } finally {
-            writer.close();
+        if (inFrames.size() > patialRuns.size()) {
+            inFrames.subList(patialRuns.size(), inFrames.size()).clear();
+        }
+        int i;
+        for (i = 0; i < inFrames.size(); i++) {
+            inFrames.get(i).resize(patialRuns.get(i).maxFrameSize);
+        }
+        for (; i < patialRuns.size(); i++) {
+            inFrames.add(new GroupVSizeFrame(ctx, patialRuns.get(i).maxFrameSize));
         }
     }
 
-    // creates a new run from runs that can fit in memory.
-    private void doPassWithReplacementSelection(List<IFrameReader> runs) throws HyracksDataException {
-        FileReference newRun = null;
-        IFrameWriter writer = this.writer;
-        boolean finalPass = false;
-        if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
-            finalPass = true;
-            for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
-                inFrames.remove(i);
-            }
-        } else {
-            newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
-            writer = new RunFileWriter(newRun, ctx.getIOManager());
-            writer.open();
-        }
-        try {
-            IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
-            for (int i = 0; i < inFrames.size(); i++) {
-                runCursors[i] = runs.get(i);
-            }
-            RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
-                    comparators, nmkComputer, recordDesc);
-            merger.open();
-            try {
-                while (merger.nextFrame(outFrame)) {
-                    if (outputLimit > 0 && finalPass) {
-                        outFrameAccessor.reset(outFrame);
-                        int count = outFrameAccessor.getTupleCount();
-                        if ((currentSize + count) > outputLimit) {
-                            ByteBuffer b = ctx.allocateFrame();
-                            FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
-                            partialAppender.reset(b, true);
-                            int copyCount = outputLimit - currentSize;
-                            for (int i = 0; i < copyCount; i++) {
-                                partialAppender.append(outFrameAccessor, i);
-                                currentSize++;
-                            }
-                            FrameUtils.makeReadable(b);
-                            FrameUtils.flushFrame(b, writer);
-                            break;
-                        } else {
-                            FrameUtils.flushFrame(outFrame, writer);
-                            currentSize += count;
-                        }
-                    } else {
-                        FrameUtils.flushFrame(outFrame, writer);
-                    }
-                }
-            } finally {
-                merger.close();
-            }
+    protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+        return nextWriter;
+    }
 
-            if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
-                runs.clear();
-                return;
-            }
+    protected RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException {
+        FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
+        return new RunFileWriter(newRun, ctx.getIOManager());
+    }
+
+    protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+            throws HyracksDataException {
+        return mergeFileWriter;
+    }
+
+    protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+        return nextWriter;
+    }
+
+    protected int[] getSortFields() {
+        return sortFields;
+    }
 
-            runs.subList(0, inFrames.size()).clear();
-            if (!finalPass) {
-                runs.add(0, ((RunFileWriter) writer).createReader());
+    private int merge(IFrameWriter writer, List<RunAndMaxFrameSizePair> partialRuns)
+            throws HyracksDataException {
+        tempRuns.clear();
+        for (int i = 0; i < partialRuns.size(); i++) {
+            tempRuns.add(partialRuns.get(i).run);
+        }
+        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, tempRuns, inFrames, getSortFields(),
+                comparators, nmkComputer, recordDesc, topK);
+        int maxFrameSize = 0;
+        int io = 0;
+        merger.open();
+        try {
+            while (merger.nextFrame(outputFrame)) {
+                FrameUtils.flushFrame(outputFrame.getBuffer(), writer);
+                maxFrameSize = maxFrameSize < outputFrame.getFrameSize() ? outputFrame.getFrameSize() : maxFrameSize;
+                io++;
             }
         } finally {
-            if (!finalPass) {
-                writer.close();
+            merger.close();
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Output " + io + " frames");
             }
         }
+        return maxFrameSize;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index 8dbdbd4..82a8453 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -1,161 +1,69 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
 
-public class FrameSorterMergeSort implements IFrameSorter {
-    private final IHyracksTaskContext ctx;
-    private final int[] sortFields;
-    private final INormalizedKeyComputer nkc;
-    private final IBinaryComparator[] comparators;
-    private final List<ByteBuffer> buffers;
+public class FrameSorterMergeSort extends AbstractFrameSorter {
 
-    private final FrameTupleAccessor fta1;
-    private final FrameTupleAccessor fta2;
-
-    private final FrameTupleAppender appender;
-
-    private final ByteBuffer outFrame;
-
-    private int dataFrameCount;
-    private int[] tPointers;
     private int[] tPointersTemp;
-    private int tupleCount;
+    private FrameTupleAccessor fta2;
 
-    public FrameSorterMergeSort(IHyracksTaskContext ctx, int[] sortFields,
+    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this.ctx = ctx;
-        this.sortFields = sortFields;
-        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        buffers = new ArrayList<ByteBuffer>();
-        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        outFrame = ctx.allocateFrame();
-
-        dataFrameCount = 0;
+        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+                Integer.MAX_VALUE);
     }
 
-    @Override
-    public void reset() {
-        dataFrameCount = 0;
-        tupleCount = 0;
-    }
-
-    @Override
-    public int getFrameCount() {
-        return dataFrameCount;
-    }
-
-    @Override
-    public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
-        ByteBuffer copyFrame;
-        if (dataFrameCount == buffers.size()) {
-            copyFrame = ctx.allocateFrame();
-            buffers.add(copyFrame);
-        } else {
-            copyFrame = buffers.get(dataFrameCount);
-        }
-        FrameUtils.copy(buffer, copyFrame);
-        ++dataFrameCount;
+    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
+        super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+                outputLimit);
+        fta2 = new FrameTupleAccessor(recordDescriptor);
     }
 
     @Override
-    public void sortFrames() throws HyracksDataException {
-        int nBuffers = dataFrameCount;
-        tupleCount = 0;
-        for (int i = 0; i < nBuffers; ++i) {
-            fta1.reset(buffers.get(i));
-            tupleCount += fta1.getTupleCount();
-        }
-        int sfIdx = sortFields[0];
-        tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
-        int ptr = 0;
-        for (int i = 0; i < nBuffers; ++i) {
-            fta1.reset(buffers.get(i));
-            int tCount = fta1.getTupleCount();
-            byte[] array = fta1.getBuffer().array();
-            for (int j = 0; j < tCount; ++j) {
-                int tStart = fta1.getTupleStartOffset(j);
-                int tEnd = fta1.getTupleEndOffset(j);
-                tPointers[ptr * 4] = i;
-                tPointers[ptr * 4 + 1] = tStart;
-                tPointers[ptr * 4 + 2] = tEnd;
-                int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
-                int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
-                int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
-                tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
-                ++ptr;
-            }
-        }
-        if (tupleCount > 0) {
+    void sortTupleReferences() throws HyracksDataException {
+        if (tPointersTemp == null || tPointersTemp.length < tPointers.length) {
             tPointersTemp = new int[tPointers.length];
-            sort(0, tupleCount);
         }
+        sort(0, tupleCount);
     }
 
     @Override
-    public void flushFrames(IFrameWriter writer) throws HyracksDataException {
-        appender.reset(outFrame, true);
-        for (int ptr = 0; ptr < tupleCount; ++ptr) {
-            int i = tPointers[ptr * 4];
-            int tStart = tPointers[ptr * 4 + 1];
-            int tEnd = tPointers[ptr * 4 + 2];
-            ByteBuffer buffer = buffers.get(i);
-            fta1.reset(buffer);
-            if (!appender.append(fta1, tStart, tEnd)) {
-                FrameUtils.flushFrame(outFrame, writer);
-                appender.reset(outFrame, true);
-                if (!appender.append(fta1, tStart, tEnd)) {
-                    throw new HyracksDataException("Record size (" + (tEnd - tStart) + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
-        }
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(outFrame, writer);
-        }
+    public void close() {
+        super.close();
+        tPointersTemp = null;
     }
 
-    private void sort(int offset, int length) throws HyracksDataException {
+    void sort(int offset, int length) throws HyracksDataException {
         int step = 1;
-        int len = length;
-        int end = offset + len;
+        int end = offset + length;
         /** bottom-up merge */
-        while (step < len) {
+        while (step < length) {
             /** merge */
             for (int i = offset; i < end; i += 2 * step) {
                 int next = i + step;
@@ -175,8 +83,6 @@ public class FrameSorterMergeSort implements IFrameSorter {
 
     /**
      * Merge two subarrays into one
-     *
-     * @throws HyracksDataException
      */
     private void merge(int start1, int start2, int len1, int len2) throws HyracksDataException {
         int targetPos = start1;
@@ -226,20 +132,20 @@ public class FrameSorterMergeSort implements IFrameSorter {
         }
         int i2 = tp2i;
         int j2 = tp2j;
-        ByteBuffer buf1 = buffers.get(i1);
-        ByteBuffer buf2 = buffers.get(i2);
+        ByteBuffer buf1 = super.bufferManager.getFrame(i1);
+        ByteBuffer buf2 = super.bufferManager.getFrame(i2);
         byte[] b1 = buf1.array();
         byte[] b2 = buf2.array();
-        fta1.reset(buf1);
+        inputTupleAccessor.reset(buf1);
         fta2.reset(buf2);
         for (int f = 0; f < comparators.length; ++f) {
             int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf1.array(), j1 + (fIdx - 1) * 4);
-            int f1End = IntSerDeUtils.getInt(buf1.array(), j1 + fIdx * 4);
-            int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
+            int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b1, j1 + (fIdx - 1) * 4);
+            int f1End = IntSerDeUtils.getInt(b1, j1 + fIdx * 4);
+            int s1 = j1 + inputTupleAccessor.getFieldSlotsLength() + f1Start;
             int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf2.array(), j2 + (fIdx - 1) * 4);
-            int f2End = IntSerDeUtils.getInt(buf2.array(), j2 + fIdx * 4);
+            int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b2, j2 + (fIdx - 1) * 4);
+            int f2End = IntSerDeUtils.getInt(b2, j2 + fIdx * 4);
             int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
             int l2 = f2End - f2Start;
             int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
@@ -250,8 +156,4 @@ public class FrameSorterMergeSort implements IFrameSorter {
         return 0;
     }
 
-    @Override
-    public void close() {
-        this.buffers.clear();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index d607a51..328bb5e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -1,153 +1,54 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
 
-public class FrameSorterQuickSort implements IFrameSorter {
-    private final IHyracksTaskContext ctx;
-    private final int[] sortFields;
-    private final INormalizedKeyComputer nkc;
-    private final IBinaryComparator[] comparators;
-    private final List<ByteBuffer> buffers;
+public class FrameSorterQuickSort extends AbstractFrameSorter {
 
-    private final FrameTupleAccessor fta1;
-    private final FrameTupleAccessor fta2;
+    private FrameTupleAccessor fta2;
 
-    private final FrameTupleAppender appender;
-
-    private final ByteBuffer outFrame;
-
-    private int dataFrameCount;
-    private int[] tPointers;
-    private int tupleCount;
-
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, int[] sortFields,
+    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this.ctx = ctx;
-        this.sortFields = sortFields;
-        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        buffers = new ArrayList<ByteBuffer>();
-        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        outFrame = ctx.allocateFrame();
-
-        dataFrameCount = 0;
-    }
-
-    @Override
-    public void reset() {
-        dataFrameCount = 0;
-        tupleCount = 0;
-    }
-
-    @Override
-    public int getFrameCount() {
-        return dataFrameCount;
+        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+                Integer.MAX_VALUE);
     }
 
-    @Override
-    public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
-        ByteBuffer copyFrame;
-        if (dataFrameCount == buffers.size()) {
-            copyFrame = ctx.allocateFrame();
-            buffers.add(copyFrame);
-        } else {
-            copyFrame = buffers.get(dataFrameCount);
-        }
-        FrameUtils.copy(buffer, copyFrame);
-        ++dataFrameCount;
-    }
-
-    @Override
-    public void sortFrames() throws HyracksDataException {
-        int nBuffers = dataFrameCount;
-        tupleCount = 0;
-        for (int i = 0; i < nBuffers; ++i) {
-            fta1.reset(buffers.get(i));
-            tupleCount += fta1.getTupleCount();
-        }
-        int sfIdx = sortFields[0];
-        tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
-        int ptr = 0;
-        for (int i = 0; i < nBuffers; ++i) {
-            fta1.reset(buffers.get(i));
-            int tCount = fta1.getTupleCount();
-            byte[] array = fta1.getBuffer().array();
-            for (int j = 0; j < tCount; ++j) {
-                int tStart = fta1.getTupleStartOffset(j);
-                int tEnd = fta1.getTupleEndOffset(j);
-                tPointers[ptr * 4] = i;
-                tPointers[ptr * 4 + 1] = tStart;
-                tPointers[ptr * 4 + 2] = tEnd;
-                int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
-                int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
-                int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
-                tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
-                ++ptr;
-            }
-        }
-        if (tupleCount > 0) {
-            sort(tPointers, 0, tupleCount);
-        }
+    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
+        super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+                outputLimit);
+        fta2 = new FrameTupleAccessor(recordDescriptor);
     }
 
     @Override
-    public void flushFrames(IFrameWriter writer) throws HyracksDataException {
-        appender.reset(outFrame, true);
-        for (int ptr = 0; ptr < tupleCount; ++ptr) {
-            int i = tPointers[ptr * 4];
-            int tStart = tPointers[ptr * 4 + 1];
-            int tEnd = tPointers[ptr * 4 + 2];
-            ByteBuffer buffer = buffers.get(i);
-            fta1.reset(buffer);
-            if (!appender.append(fta1, tStart, tEnd)) {
-                FrameUtils.flushFrame(outFrame, writer);
-                appender.reset(outFrame, true);
-                if (!appender.append(fta1, tStart, tEnd)) {
-                    throw new HyracksDataException("Record size (" + (tEnd - tStart) + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
-        }
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(outFrame, writer);
-        }
+    void sortTupleReferences() throws HyracksDataException {
+        sort(tPointers, 0, tupleCount);
     }
 
-    private void sort(int[] tPointers, int offset, int length) throws HyracksDataException {
+    void sort(int[] tPointers, int offset, int length) throws HyracksDataException {
         int m = offset + (length >> 1);
         int mi = tPointers[m * 4];
         int mj = tPointers[m * 4 + 1];
@@ -221,17 +122,17 @@ public class FrameSorterQuickSort implements IFrameSorter {
         }
         int i2 = tp2i;
         int j2 = tp2j;
-        ByteBuffer buf1 = buffers.get(i1);
-        ByteBuffer buf2 = buffers.get(i2);
+        ByteBuffer buf1 = super.bufferManager.getFrame(i1);
+        ByteBuffer buf2 = super.bufferManager.getFrame(i2);
         byte[] b1 = buf1.array();
         byte[] b2 = buf2.array();
-        fta1.reset(buf1);
+        inputTupleAccessor.reset(buf1);
         fta2.reset(buf2);
         for (int f = 0; f < comparators.length; ++f) {
             int fIdx = sortFields[f];
             int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
             int f1End = buf1.getInt(j1 + fIdx * 4);
-            int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
+            int s1 = j1 + inputTupleAccessor.getFieldSlotsLength() + f1Start;
             int l1 = f1End - f1Start;
             int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
             int f2End = buf2.getInt(j2 + fIdx * 4);
@@ -245,8 +146,4 @@ public class FrameSorterQuickSort implements IFrameSorter {
         return 0;
     }
 
-    @Override
-    public void close() {
-        this.buffers.clear();
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
new file mode 100644
index 0000000..564a462
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFramePool;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.ITupleBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableTupleMemoryManager;
+
+public class HeapSortRunGenerator extends AbstractSortRunGenerator {
+    protected final IHyracksTaskContext ctx;
+    protected final int frameLimit;
+    protected final int topK;
+    protected final int[] sortFields;
+    protected final INormalizedKeyComputerFactory nmkFactory;
+    protected final IBinaryComparatorFactory[] comparatorFactories;
+    protected final RecordDescriptor recordDescriptor;
+    protected ITupleSorter tupleSorter;
+    protected IFrameTupleAccessor inAccessor;
+
+    public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        super();
+        this.ctx = ctx;
+        this.frameLimit = frameLimit;
+        this.topK = topK;
+        this.sortFields = sortFields;
+        this.nmkFactory = firstKeyNormalizerFactory;
+        this.comparatorFactories = comparatorFactories;
+        this.inAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.recordDescriptor = recordDescriptor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize());
+        ITupleBufferManager bufferManager = new VariableTupleMemoryManager(framePool, recordDescriptor);
+        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactory,
+                comparatorFactories);
+        super.open();
+    }
+
+    @Override
+    public ISorter getSorter() throws HyracksDataException {
+        return tupleSorter;
+    }
+
+    @Override
+    protected RunFileWriter getRunFileWriter() throws HyracksDataException {
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                HeapSortRunGenerator.class.getSimpleName());
+        return new RunFileWriter(file, ctx.getIOManager());
+    }
+
+    @Override
+    protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
+        return writer;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inAccessor.reset(buffer);
+        for (int i = 0; i < inAccessor.getTupleCount(); i++) {
+            if (!tupleSorter.insertTuple(inAccessor, i)) {
+                flushFramesToRun();
+                if (!tupleSorter.insertTuple(inAccessor, i)) {
+                    throw new HyracksDataException("The given tuple is too big to insert into the sorting memory.");
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
new file mode 100644
index 0000000..9976aad
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotBiggestFirst;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
+
+public class HybridTopKSortRunGenerator extends HeapSortRunGenerator {
+    private static final Logger LOG = Logger.getLogger(HybridTopKSortRunGenerator.class.getName());
+
+    private static final int SWITCH_TO_FRAME_SORTER_THRESHOLD = 2;
+    private IFrameSorter frameSorter = null;
+    private int tupleSorterFlushedTimes = 0;
+
+    public HybridTopKSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        super(ctx, frameLimit, topK, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+    }
+
+    @Override
+    public ISorter getSorter() throws HyracksDataException {
+        if (tupleSorter != null) {
+            return tupleSorter;
+        } else if (frameSorter != null) {
+            return frameSorter;
+        }
+        return null;
+    }
+
+    @Override
+    protected RunFileWriter getRunFileWriter() throws HyracksDataException {
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                HybridTopKSortRunGenerator.class.getSimpleName());
+        return new RunFileWriter(file, ctx.getIOManager());
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inAccessor.reset(buffer);
+        if (tupleSorter != null) {
+            boolean isBadK = false;
+            for (int i = 0; i < inAccessor.getTupleCount(); i++) {
+                if (!tupleSorter.insertTuple(inAccessor, i)) {
+                    flushFramesToRun();
+                    isBadK = true;
+                    if (!tupleSorter.insertTuple(inAccessor, i)) {
+                        throw new HyracksDataException("The given tuple is too big to insert into the sorting memory.");
+                    }
+                }
+            }
+            if (isBadK) {
+                tupleSorterFlushedTimes++;
+                if (tupleSorterFlushedTimes > SWITCH_TO_FRAME_SORTER_THRESHOLD) {
+                    if (tupleSorter.hasRemaining()) {
+                        flushFramesToRun();
+                    }
+                    tupleSorter.close();
+                    tupleSorter = null;
+                    if (LOG.isLoggable(Level.FINE)) {
+                        LOG.fine("clear tupleSorter");
+                    }
+                }
+            }
+        } else {
+            if (frameSorter == null) {
+                VariableFrameMemoryManager bufferManager = new VariableFrameMemoryManager(
+                        new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize()),
+                        new FrameFreeSlotBiggestFirst(frameLimit - 1));
+                frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, nmkFactory, comparatorFactories,
+                        recordDescriptor, topK);
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.fine("create frameSorter");
+                }
+            }
+            if (!frameSorter.insertFrame(buffer)) {
+                flushFramesToRun();
+                if (!frameSorter.insertFrame(buffer)) {
+                    throw new HyracksDataException("The given frame is too big to insert into the sorting memory.");
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
index d9b8d37..6d0b100 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
@@ -1,37 +1,28 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public interface IFrameSorter {
+public interface IFrameSorter extends ISorter {
 
-    public void reset();
+    int getFrameCount();
 
-    public int getFrameCount();
-
-    public void insertFrame(ByteBuffer buffer) throws HyracksDataException;
-
-    public void sortFrames() throws HyracksDataException;
-
-    public void flushFrames(IFrameWriter writer) throws HyracksDataException;
-
-    public void close();
+    boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
deleted file mode 100644
index e669335..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * @author pouria Defines the required operations, needed for any memory
- *         manager, used in sorting with replacement selection, to manage the
- *         free spaces
- */
-
-public interface IMemoryManager {
-
-    /**
-     * Allocates a free slot equal or greater than requested length. Pointer to
-     * the allocated slot is put in result, and gets returned to the caller. If
-     * no proper free slot is available, result would contain a null/invalid
-     * pointer (may vary between different implementations)
-     * 
-     * @param length
-     * @param result
-     * @throws HyracksDataException
-     */
-    void allocate(int length, Slot result) throws HyracksDataException;
-
-    /**
-     * Unallocates the specified slot (and returns it back to the free slots
-     * set)
-     * 
-     * @param s
-     * @return the total length of unallocted slot
-     * @throws HyracksDataException
-     */
-    int unallocate(Slot s) throws HyracksDataException;
-
-    /**
-     * @param frameIndex
-     * @return the specified frame, from the set of memory buffers, being
-     *         managed by this memory manager
-     */
-    ByteBuffer getFrame(int frameIndex);
-
-    /**
-     * Writes the specified tuple into the specified memory slot (denoted by
-     * frameIx and offset)
-     * 
-     * @param frameIx
-     * @param offset
-     * @param src
-     * @param tIndex
-     * @return
-     */
-    boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex);
-
-    /**
-     * Reads the specified tuple (denoted by frameIx and offset) and appends it
-     * to the passed FrameTupleAppender
-     * 
-     * @param frameIx
-     * @param offset
-     * @param dest
-     * @return
-     */
-    boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
-
-    /**
-     * close and cleanup the memory manager
-     */
-    void close();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
index 2840d01..d21255e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -16,7 +16,6 @@ package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 
 /**
@@ -28,5 +27,5 @@ public interface IRunGenerator extends IFrameWriter {
     /**
      * @return the list of generated (sorted) runs
      */
-    public List<IFrameReader> getRuns();
+    List<RunAndMaxFrameSizePair> getRuns();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
deleted file mode 100644
index 8cff0df..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * @author pouria
- *         Defines the selection tree, used in sorting with replacement
- *         selection to manage the order of output tuples into the runs, during
- *         the run generation phase. This tree contains tuples, belonging to two
- *         different runs: - Current run (being written to the output) - Next
- *         run
- */
-
-public interface ISelectionTree {
-
-    /**
-     * Inserts a new element into the selectionTree
-     *
-     * @param element
-     *            contains the pointer to the memory slot, containing the tuple,
-     *            along with its run number
-     * @throws HyracksDataException
-     */
-    void insert(int[] element) throws HyracksDataException;
-
-    /**
-     * Removes and returns the smallest element in the tree
-     *
-     * @param result
-     *            is the array that will eventually contain minimum entry
-     *            pointer
-     * @throws HyracksDataException
-     */
-    void getMin(int[] result) throws HyracksDataException;
-
-    /**
-     * Removes and returns the largest element in the tree
-     *
-     * @param result
-     *            is the array that will eventually contain maximum entry
-     *            pointer
-     * @throws HyracksDataException
-     */
-    void getMax(int[] result) throws HyracksDataException;
-
-    /**
-     * @return True of the selection tree does not have any element, false
-     *         otherwise
-     */
-    boolean isEmpty();
-
-    /**
-     * Removes all the elements in the tree
-     */
-    void reset();
-
-    /**
-     * Returns (and does NOT remove) the smallest element in the tree
-     *
-     * @param result
-     *            is the array that will eventually contain minimum entry
-     *            pointer
-     */
-    void peekMin(int[] result);
-
-    /**
-     * Returns (and does NOT remove) the largest element in the tree
-     *
-     * @param result
-     *            is the array that will eventually contain maximum entry
-     *            pointer
-     * @throws HyracksDataException
-     */
-    void peekMax(int[] result) throws HyracksDataException;
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java
new file mode 100644
index 0000000..09a8169
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISorter {
+
+    boolean hasRemaining();
+
+    void reset() throws HyracksDataException;
+
+    void sort() throws HyracksDataException;
+
+    void close();
+
+    int flush(IFrameWriter writer) throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java
new file mode 100644
index 0000000..bea8b35
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITupleSorter extends ISorter {
+    int getTupleCount();
+
+    boolean insertTuple(IFrameTupleAccessor frameTupleAccessor, int index) throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 6fa21b5..379a783 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -36,6 +36,10 @@ import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
 
 public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -76,7 +80,7 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
         builder.addBlockingEdge(sa, ma);
     }
 
-    public static class SortTaskState extends AbstractStateObject {
+    private static class SortTaskState extends AbstractStateObject {
         private FrameSorterMergeSort frameSorter;
 
         public SortTaskState() {
@@ -110,20 +114,29 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
-                    state.frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
-                            comparatorFactories, recordDescriptors[0]);
+                    state = new SortTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
+
+                    IFrameBufferManager frameBufferManager = new VariableFrameMemoryManager(
+                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), new FrameFreeSlotLastFit());
+
+                    state.frameSorter = new FrameSorterMergeSort(ctx, frameBufferManager, sortFields,
+                            firstKeyNormalizerFactory, comparatorFactories, recordDescriptors[0]);
                     state.frameSorter.reset();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    state.frameSorter.insertFrame(buffer);
+                    if (!state.frameSorter.insertFrame(buffer)) {
+                        throw new HyracksDataException("Failed to insert the given frame into sorting buffer. "
+                                + "Please increase the sorting memory budget to enable the in-memory sorting, "
+                                + "or you could use ExternalSort instead.");
+                    }
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.frameSorter.sortFrames();
+                    state.frameSorter.sort();
                     ctx.setStateObject(state);
                 }
 
@@ -152,7 +165,7 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
                     try {
                         SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                                 getOperatorId(), SORT_ACTIVITY_ID), partition));
-                        state.frameSorter.flushFrames(writer);
+                        state.frameSorter.flush(writer);
                     } catch (Exception e) {
                         writer.fail();
                         throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
deleted file mode 100644
index ef1ae88..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * @author pouria
- *         Operator descriptor for sorting with replacement, consisting of two
- *         phases:
- *         - Run Generation: Denoted by OptimizedSortActivity below, in which
- *         sort runs get generated from the input data. This phases uses the
- *         Selection Tree and Memory Manager to benefit from the replacement
- *         selection optimization, to create runs which are longer than the
- *         available memory size.
- *         - Merging: Denoted by MergeActivity below, in which runs (generated
- *         in the previous phase) get merged via a merger. Each run has a single
- *         buffer in memory, and a priority queue is used to select the top
- *         tuple each time. Top tuple is send to a new run or output
- */
-
-public class OptimizedExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
-
-    private static final int NO_LIMIT = -1;
-    private static final long serialVersionUID = 1L;
-    private static final int SORT_ACTIVITY_ID = 0;
-    private static final int MERGE_ACTIVITY_ID = 1;
-
-    private final int[] sortFields;
-    private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final int memSize;
-    private final int outputLimit;
-
-    public OptimizedExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
-        this(spec, framesLimit, NO_LIMIT, sortFields, null, comparatorFactories, recordDescriptor);
-    }
-
-    public OptimizedExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int outputLimit,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
-        this(spec, framesLimit, outputLimit, sortFields, null, comparatorFactories, recordDescriptor);
-    }
-
-    public OptimizedExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSize, int outputLimit,
-            int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
-        super(spec, 1, 1);
-        this.memSize = memSize;
-        this.outputLimit = outputLimit;
-        this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
-        this.comparatorFactories = comparatorFactories;
-        if (memSize <= 1) {
-            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
-        }
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
-        OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
-        builder.addActivity(this, osa);
-        builder.addSourceEdge(0, osa, 0);
-
-        builder.addActivity(this, oma);
-        builder.addTargetEdge(0, oma, 0);
-
-        builder.addBlockingEdge(osa, oma);
-    }
-
-    public static class OptimizedSortTaskState extends AbstractStateObject {
-        private List<IFrameReader> runs;
-
-        public OptimizedSortTaskState() {
-        }
-
-        private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-    }
-
-    private class OptimizedSortActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public OptimizedSortActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            final IRunGenerator runGen;
-            if (outputLimit == NO_LIMIT) {
-                runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
-                        comparatorFactories, recordDescriptors[0], memSize);
-            } else {
-                runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx, sortFields, firstKeyNormalizerFactory,
-                        comparatorFactories, recordDescriptors[0], memSize, outputLimit);
-            }
-
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                @Override
-                public void open() throws HyracksDataException {
-
-                    runGen.open();
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    runGen.nextFrame(buffer);
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    OptimizedSortTaskState state = new OptimizedSortTaskState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition));
-                    runGen.close();
-                    state.runs = runGen.getRuns();
-                    ctx.setStateObject(state);
-
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    runGen.fail();
-                }
-            };
-            return op;
-        }
-    }
-
-    private class OptimizedMergeActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public OptimizedMergeActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                @Override
-                public void initialize() throws HyracksDataException {
-                    OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(
-                            new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
-
-                    List<IFrameReader> runs = state.runs;
-
-                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-                    for (int i = 0; i < comparatorFactories.length; ++i) {
-                        comparators[i] = comparatorFactories[i].createBinaryComparator();
-                    }
-
-                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
-                            : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-                    int necessaryFrames = Math.min(runs.size() + 2, memSize);
-                    ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
-                            comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
-
-                    merger.processWithReplacementSelection();
-
-                }
-            };
-            return op;
-        }
-    }
-}
\ No newline at end of file


Mime
View raw message