asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [08/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject
Date Fri, 26 Feb 2016 05:54:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
new file mode 100644
index 0000000..a2922ae
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.group.external;
+
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.AggregateType;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.ISpillableTable;
+import org.apache.hyracks.dataflow.std.group.ISpillableTableFactory;
+
+public class ExternalGroupWriteOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable
+        implements IRunFileWriterGenerator {
+    private static Logger LOGGER = Logger.getLogger("ExternalGroupbyWrite");
+    private final IHyracksTaskContext ctx;
+    private final Object stateId;
+    private final ISpillableTableFactory spillableTableFactory;
+    private final RecordDescriptor partialAggRecordDesc;
+    private final RecordDescriptor outRecordDesc;
+    private final IAggregatorDescriptorFactory mergeAggregatorFactory;
+    private final int[] mergeGroupFields;
+    private final IBinaryComparator[] groupByComparators;
+    private final int frameLimit;
+    private final INormalizedKeyComputer nmkComputer;
+    private final ArrayList<RunFileWriter> generatedRuns = new ArrayList<>();
+
+    public ExternalGroupWriteOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
+            ISpillableTableFactory spillableTableFactory, RecordDescriptor partialAggRecordDesc,
+            RecordDescriptor outRecordDesc, int framesLimit, int[] groupFields,
+            INormalizedKeyComputerFactory nmkFactory, IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory) {
+        this.ctx = ctx;
+        this.stateId = stateId;
+        this.spillableTableFactory = spillableTableFactory;
+        this.frameLimit = framesLimit;
+        this.nmkComputer = nmkFactory.createNormalizedKeyComputer();
+
+        this.partialAggRecordDesc = partialAggRecordDesc;
+        this.outRecordDesc = outRecordDesc;
+
+        this.mergeAggregatorFactory = aggregatorFactory;
+
+        //create merge group fields
+        int numGroupFields = groupFields.length;
+        mergeGroupFields = new int[numGroupFields];
+        for (int i = 0; i < numGroupFields; i++) {
+            mergeGroupFields[i] = i;
+        }
+
+        //setup comparators for grouping
+        groupByComparators = new IBinaryComparator[Math.min(mergeGroupFields.length, comparatorFactories.length)];
+        for (int i = 0; i < groupByComparators.length; i++) {
+            groupByComparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        ExternalGroupState aggState = (ExternalGroupState) ctx.getStateObject(stateId);
+        ISpillableTable table = aggState.getSpillableTable();
+        RunFileWriter[] partitionRuns = aggState.getRuns();
+        int[] numberOfTuples = aggState.getSpilledNumTuples();
+        try {
+            writer.open();
+            doPass(table, partitionRuns, numberOfTuples, writer, 1); // level 0 use used at build stage.
+        } catch (Exception e) {
+            generatedRuns.forEach(run -> run.getFileReference().delete());
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
+    }
+
+    private void doPass(ISpillableTable table, RunFileWriter[] runs, int[] numOfTuples, IFrameWriter writer, int level)
+            throws HyracksDataException {
+        assert table.getNumPartitions() == runs.length;
+        for (int i = 0; i < runs.length; i++) {
+            if (runs[i] == null) {
+                table.flushFrames(i, writer, AggregateType.FINAL);
+            }
+        }
+        table.close();
+
+        for (int i = 0; i < runs.length; i++) {
+            if (runs[i] != null) {
+                ISpillableTable partitionTable = spillableTableFactory.buildSpillableTable(ctx, numOfTuples[i],
+                        runs[i].getFileSize(), mergeGroupFields, groupByComparators, nmkComputer,
+                        mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, frameLimit, level);
+                RunFileWriter[] runFileWriters = new RunFileWriter[partitionTable.getNumPartitions()];
+                int[] sizeInTuplesNextLevel = buildGroup(runs[i].createDeleteOnCloseReader(), partitionTable,
+                        runFileWriters);
+                for (int idFile = 0; idFile < runFileWriters.length; idFile++) {
+                    if (runFileWriters[idFile] != null) {
+                        generatedRuns.add(runFileWriters[idFile]);
+                    }
+                }
+
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    int numOfSpilledPart = 0;
+                    for (int x = 0; x < numOfTuples.length; x++) {
+                        if (numOfTuples[x] > 0) {
+                            numOfSpilledPart++;
+                        }
+                    }
+                    LOGGER.fine("level " + level + ":" + "build with " + numOfTuples.length + " partitions"
+                            + ", spilled " + numOfSpilledPart + " partitions");
+                }
+                doPass(partitionTable, runFileWriters, sizeInTuplesNextLevel, writer, level + 1);
+            }
+        }
+    }
+
+    private int[] buildGroup(RunFileReader reader, ISpillableTable table, RunFileWriter[] runFileWriters)
+            throws HyracksDataException {
+        ExternalHashGroupBy groupBy = new ExternalHashGroupBy(this, table, runFileWriters, partialAggRecordDesc);
+        reader.open();
+        try {
+            VSizeFrame frame = new VSizeFrame(ctx);
+            while (reader.nextFrame(frame)) {
+                groupBy.insert(frame.getBuffer());
+            }
+            groupBy.flushSpilledPartitions();
+        } finally {
+            reader.close();
+        }
+        return groupBy.getSpilledNumTuples();
+    }
+
+    @Override
+    public RunFileWriter getRunFileWriter() throws HyracksDataException {
+        FileReference newRun = ctx.getJobletContext()
+                .createManagedWorkspaceFile(ExternalGroupOperatorDescriptor.class.getSimpleName());
+        return new RunFileWriter(newRun, ctx.getIOManager());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
new file mode 100644
index 0000000..e0ef2b3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.group.external;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.group.AggregateType;
+import org.apache.hyracks.dataflow.std.group.ISpillableTable;
+
+public class ExternalHashGroupBy {
+
+    private final IRunFileWriterGenerator runFileWriterGenerator;
+
+    private FrameTupleAccessor accessor;
+    private ISpillableTable table;
+    private RunFileWriter[] runWriters;
+    private int[] spilledNumTuples;
+
+    public ExternalHashGroupBy(IRunFileWriterGenerator runFileWriterGenerator, ISpillableTable table,
+            RunFileWriter[] runWriters, RecordDescriptor inRecordDescriptor) {
+        this.runFileWriterGenerator = runFileWriterGenerator;
+        this.table = table;
+        this.runWriters = runWriters;
+        this.accessor = new FrameTupleAccessor(inRecordDescriptor);
+        this.spilledNumTuples = new int[runWriters.length];
+    }
+
+    public void insert(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            if (!table.insert(accessor, i)) {
+                do {
+                    int partition = table.findVictimPartition(accessor, i);
+                    if (partition < 0) {
+                        throw new HyracksDataException("Failed to insert a new buffer into the aggregate operator!");
+                    }
+                    RunFileWriter writer = getPartitionWriterOrCreateOneIfNotExist(partition);
+                    flushPartitionToRun(partition, writer);
+                } while (!table.insert(accessor, i));
+            }
+        }
+    }
+
+    private void flushPartitionToRun(int partition, RunFileWriter writer)
+            throws HyracksDataException {
+        try {
+            spilledNumTuples[partition] += table.flushFrames(partition, writer, AggregateType.PARTIAL);
+            table.clear(partition);
+        } catch (Exception ex) {
+            writer.fail();
+            throw new HyracksDataException(ex);
+        }
+    }
+
+    public void flushSpilledPartitions() throws HyracksDataException {
+        for (int i = 0; i < runWriters.length; ++i) {
+            if (runWriters[i] != null) {
+                flushPartitionToRun(i, runWriters[i]);
+                runWriters[i].close();
+            }
+        }
+    }
+
+    private RunFileWriter getPartitionWriterOrCreateOneIfNotExist(int partition) throws HyracksDataException {
+        if (runWriters[partition] == null) {
+            runWriters[partition] = runFileWriterGenerator.getRunFileWriter();
+            runWriters[partition].open();
+        }
+        return runWriters[partition];
+    }
+
+    public int[] getSpilledNumTuples() {
+        return spilledNumTuples;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/IRunFileWriterGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/IRunFileWriterGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/IRunFileWriterGenerator.java
new file mode 100644
index 0000000..58c6ae1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/IRunFileWriterGenerator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.group.external;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+
+public interface IRunFileWriterGenerator {
+    RunFileWriter getRunFileWriter() throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
deleted file mode 100644
index 2f4a0b2..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group.hash;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.group.AggregateState;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-class GroupingHashTable {
-    /**
-     * The pointers in the link store 3 int values for each entry in the
-     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
-     *
-     * @author vinayakb
-     */
-    private static class Link {
-        private static final int INIT_POINTERS_SIZE = 9;
-
-        int[] pointers;
-        int size;
-
-        Link() {
-            pointers = new int[INIT_POINTERS_SIZE];
-            size = 0;
-        }
-
-        void add(int bufferIdx, int tIndex, int accumulatorIdx) {
-            while (size + 3 > pointers.length) {
-                pointers = Arrays.copyOf(pointers, pointers.length * 2);
-            }
-            pointers[size++] = bufferIdx;
-            pointers[size++] = tIndex;
-            pointers[size++] = accumulatorIdx;
-        }
-    }
-
-    private static final int INIT_AGG_STATE_SIZE = 8;
-    private final IHyracksTaskContext ctx;
-
-    private final List<IFrame> buffers;
-    private final Link[] table;
-    /**
-     * Aggregate states: a list of states for all groups maintained in the main
-     * memory.
-     */
-    private AggregateState[] aggregateStates;
-    private int accumulatorSize;
-
-    private int lastBIndex;
-    private final int[] storedKeys;
-    private final int[] keys;
-    private final IBinaryComparator[] comparators;
-    private final FrameTuplePairComparator ftpc;
-    private final ITuplePartitionComputer tpc;
-    private final IAggregatorDescriptor aggregator;
-
-    private final IFrame outputFrame;
-    private final FrameTupleAppender appender;
-
-    private final FrameTupleAccessor storedKeysAccessor;
-
-    private final ArrayTupleBuilder stateTupleBuilder, outputTupleBuilder;
-
-    GroupingHashTable(IHyracksTaskContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
-            ITuplePartitionComputerFactory tpcf, IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize)
-            throws HyracksDataException {
-        this.ctx = ctx;
-
-        buffers = new ArrayList<>();
-        table = new Link[tableSize];
-
-        keys = fields;
-        storedKeys = new int[fields.length];
-        @SuppressWarnings("rawtypes")
-        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-        for (int i = 0; i < fields.length; ++i) {
-            storedKeys[i] = i;
-            storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
-        }
-
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-        tpc = tpcf.createPartitioner();
-
-        int[] keyFieldsInPartialResults = new int[fields.length];
-        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-            keyFieldsInPartialResults[i] = i;
-        }
-
-        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
-                keyFieldsInPartialResults, null);
-
-        this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
-        accumulatorSize = 0;
-
-        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
-        storedKeysAccessor = new FrameTupleAccessor(storedKeysRecordDescriptor);
-        lastBIndex = -1;
-
-        appender = new FrameTupleAppender();
-
-        addNewBuffer();
-
-        if (fields.length < outRecordDescriptor.getFields().length) {
-            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        } else {
-            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
-        }
-        outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        outputFrame = new VSizeFrame(ctx);
-    }
-
-    private void addNewBuffer() throws HyracksDataException {
-        VSizeFrame frame = new VSizeFrame(ctx);
-        buffers.add(frame);
-        appender.reset(frame, true);
-        ++lastBIndex;
-    }
-
-    void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
-        int entry = tpc.partition(accessor, tIndex, table.length);
-        Link link = table[entry];
-        if (link == null) {
-            link = table[entry] = new Link();
-        }
-        int saIndex = -1;
-        for (int i = 0; i < link.size; i += 3) {
-            int sbIndex = link.pointers[i];
-            int stIndex = link.pointers[i + 1];
-            storedKeysAccessor.reset(buffers.get(sbIndex).getBuffer());
-            int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
-            if (c == 0) {
-                saIndex = link.pointers[i + 2];
-                break;
-            }
-        }
-        if (saIndex < 0) {
-            // Did not find the key. Insert a new entry.
-            saIndex = accumulatorSize++;
-            // Add keys
-
-            // Add aggregation fields
-            AggregateState newState = aggregator.createAggregateStates();
-
-            stateTupleBuilder.reset();
-            for (int k = 0; k < keys.length; k++) {
-                stateTupleBuilder.addField(accessor, tIndex, keys[k]);
-            }
-
-            aggregator.init(stateTupleBuilder, accessor, tIndex, newState);
-
-            if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
-                    stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
-                addNewBuffer();
-                if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
-                        stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
-                    throw new HyracksDataException("Cannot init the aggregate state in a single frame.");
-                }
-            }
-
-            if (accumulatorSize >= aggregateStates.length) {
-                aggregateStates = Arrays.copyOf(aggregateStates, aggregateStates.length * 2);
-            }
-
-            aggregateStates[saIndex] = newState;
-
-            link.add(lastBIndex, appender.getTupleCount() - 1, saIndex);
-
-        } else {
-            aggregator.aggregate(accessor, tIndex, null, 0, aggregateStates[saIndex]);
-        }
-    }
-
-    void write(IFrameWriter writer) throws HyracksDataException {
-        appender.reset(outputFrame, true);
-
-        for (int i = 0; i < table.length; ++i) {
-            Link link = table[i];
-            if (link != null) {
-                for (int j = 0; j < link.size; j += 3) {
-                    int bIndex = link.pointers[j];
-                    int tIndex = link.pointers[j + 1];
-                    int aIndex = link.pointers[j + 2];
-                    ByteBuffer keyBuffer = buffers.get(bIndex).getBuffer();
-                    storedKeysAccessor.reset(keyBuffer);
-
-                    // copy keys
-                    outputTupleBuilder.reset();
-                    for (int k = 0; k < storedKeys.length; k++) {
-                        outputTupleBuilder.addField(storedKeysAccessor, tIndex, storedKeys[k]);
-                    }
-
-                    aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor, tIndex,
-                            aggregateStates[aIndex]);
-
-                    FrameUtils.appendSkipEmptyFieldToWriter(writer, appender, outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize());
-
-                }
-            }
-        }
-        appender.write(writer, true);
-    }
-
-    void close() throws HyracksDataException {
-        for (AggregateState aState : aggregateStates) {
-            aState.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
deleted file mode 100644
index 902021f..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group.hash;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-class HashGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
-    private final IHyracksTaskContext ctx;
-    private final FrameTupleAccessor accessor;
-    private final Object stateId;
-    private final int[] keys;
-    private final ITuplePartitionComputerFactory tpcf;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-    private final int tableSize;
-    private final RecordDescriptor inRecordDescriptor;
-    private final RecordDescriptor outRecordDescriptor;
-
-    private HashGroupState state;
-
-    HashGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys,
-            ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
-            IAggregatorDescriptorFactory aggregatorFactory, int tableSize, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor) {
-        this.ctx = ctx;
-        this.accessor = new FrameTupleAccessor(inRecordDescriptor);
-        this.stateId = stateId;
-        this.keys = keys;
-        this.tpcf = tpcf;
-        this.comparatorFactories = comparatorFactories;
-        this.aggregatorFactory = aggregatorFactory;
-        this.tableSize = tableSize;
-        this.inRecordDescriptor = inRecordDescriptor;
-        this.outRecordDescriptor = outRecordDescriptor;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        state = new HashGroupState(ctx.getJobletContext().getJobId(), stateId);
-        state.setHashTable(new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
-                inRecordDescriptor, outRecordDescriptor, tableSize));
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        GroupingHashTable table = state.getHashTable();
-        accessor.reset(buffer);
-        int tupleCount = accessor.getTupleCount();
-        for (int i = 0; i < tupleCount; ++i) {
-            try {
-                table.insert(accessor, i);
-            } catch (Exception e) {
-                System.out.println(e.toString());
-                throw new HyracksDataException(e);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        ctx.setStateObject(state);
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        throw new HyracksDataException("HashGroupOperator is failed.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
deleted file mode 100644
index a0bbb2d..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group.hash;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-/**
- *
- */
-public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
-    private static final int HASH_BUILD_ACTIVITY_ID = 0;
-
-    private static final int OUTPUT_ACTIVITY_ID = 1;
-
-    private static final long serialVersionUID = 1L;
-
-    private final int[] keys;
-    private final ITuplePartitionComputerFactory tpcf;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-
-    private final int tableSize;
-
-    public HashGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys,
-            ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
-            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor outRecordDescriptor, int tableSize) {
-        super(spec, 1, 1);
-        this.keys = keys;
-        this.tpcf = tpcf;
-        this.comparatorFactories = comparatorFactories;
-        this.aggregatorFactory = aggregatorFactory;
-        recordDescriptors[0] = outRecordDescriptor;
-        this.tableSize = tableSize;
-    }
-
-    /*
-     * (non-Javadoc)
-     *
-     * @see
-     * org.apache.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
-     * (org.apache.hyracks.api.dataflow.IActivityGraphBuilder)
-     */
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
-        builder.addActivity(this, ha);
-
-        OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
-        builder.addActivity(this, oa);
-
-        builder.addSourceEdge(0, ha, 0);
-        builder.addTargetEdge(0, oa, 0);
-        builder.addBlockingEdge(ha, oa);
-    }
-
-    private class HashBuildActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public HashBuildActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            return new HashGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keys, tpcf,
-                    comparatorFactories, aggregatorFactory, tableSize, recordDescProvider.getInputRecordDescriptor(
-                            getActivityId(), 0), recordDescriptors[0]);
-        }
-    }
-
-    private class OutputActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public OutputActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            return new HashGroupOutputOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
-                    HASH_BUILD_ACTIVITY_ID), partition));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOutputOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOutputOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOutputOperatorNodePushable.java
deleted file mode 100644
index ce0ce34..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOutputOperatorNodePushable.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group.hash;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-class HashGroupOutputOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-    private final IHyracksTaskContext ctx;
-    private final Object stateId;
-
-    HashGroupOutputOperatorNodePushable(IHyracksTaskContext ctx, Object stateId) {
-        this.ctx = ctx;
-        this.stateId = stateId;
-    }
-
-    @Override
-    public void initialize() throws HyracksDataException {
-        HashGroupState buildState = (HashGroupState) ctx.getStateObject(stateId);
-        GroupingHashTable table = buildState.getHashTable();
-        writer.open();
-        try {
-            table.write(writer);
-        } catch (Exception e) {
-            writer.fail();
-            throw new HyracksDataException(e);
-        } finally {
-            writer.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupState.java
deleted file mode 100644
index b5cf274..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupState.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group.hash;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-
-public class HashGroupState extends AbstractStateObject {
-    private GroupingHashTable table;
-
-    public HashGroupState() {
-    }
-
-    HashGroupState(JobId jobId, Object id) {
-        super(jobId, id);
-    }
-
-    public GroupingHashTable getHashTable() {
-        return table;
-    }
-
-    public void setHashTable(GroupingHashTable table) {
-        this.table = table;
-    }
-
-    @Override
-    public void toBytes(DataOutput out) throws IOException {
-
-    }
-
-    @Override
-    public void fromBytes(DataInput in) throws IOException {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
index a17839a..091d323 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -27,18 +27,18 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
 
 /**
  * Group-by aggregation is pushed before run file generation.
  *
  * @author yingyib
  */
-public class ExternalSortGroupByRunGenerator extends ExternalSortRunGenerator {
+public class ExternalSortGroupByRunGenerator extends AbstractExternalSortRunGenerator {
 
     private final int[] groupFields;
     private final IBinaryComparatorFactory[] comparatorFactories;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
index 7f23bd9..b13c647 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -27,19 +27,19 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
 import org.apache.hyracks.dataflow.std.sort.ISorter;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 
 /**
  * Group-by aggregation is pushed into multi-pass merge of external sort.
  *
  * @author yingyib
  */
-public class ExternalSortGroupByRunMerger extends ExternalSortRunMerger {
+public class ExternalSortGroupByRunMerger extends AbstractExternalSortRunMerger {
 
     private final RecordDescriptor inputRecordDesc;
     private final RecordDescriptor partialAggRecordDesc;
@@ -54,14 +54,14 @@ public class ExternalSortGroupByRunMerger extends ExternalSortRunMerger {
     private final int[] mergeGroupFields;
     private final IBinaryComparator[] groupByComparators;
 
-    public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, ISorter frameSorter, List<RunAndMaxFrameSizePair> runs,
+    public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, ISorter frameSorter, List<GeneratedRunFileReader> runs,
             int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc,
             RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter writer, int[] groupFields,
             INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory aggregatorFactory,
             boolean localStage) {
-        super(ctx, frameSorter, runs, sortFields, comparators, nmk, partialAggRecordDesc, framesLimit,
-                writer);
+        super(ctx, frameSorter, runs, comparators, nmk, partialAggRecordDesc, framesLimit, writer);
+
         this.inputRecordDesc = inRecordDesc;
         this.partialAggRecordDesc = partialAggRecordDesc;
         this.outRecordDesc = outRecordDesc;
@@ -96,8 +96,8 @@ public class ExternalSortGroupByRunMerger extends ExternalSortRunMerger {
     protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
         IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
         boolean outputPartial = false;
-        return new PreclusteredGroupWriter(ctx, groupFields, groupByComparators,
-                aggregatorFactory, inputRecordDesc, outRecordDesc, nextWriter, outputPartial);
+        return new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregatorFactory, inputRecordDesc,
+                outRecordDesc, nextWriter, outputPartial);
     }
 
     @Override
@@ -118,8 +118,8 @@ public class ExternalSortGroupByRunMerger extends ExternalSortRunMerger {
     @Override
     protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
         boolean outputPartial = false;
-        return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators,
-                mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, nextWriter, outputPartial);
+        return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, mergeAggregatorFactory,
+                partialAggRecordDesc, outRecordDesc, nextWriter, outputPartial);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 0af47b9..16d2158 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -31,13 +31,13 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
 import org.apache.hyracks.dataflow.std.sort.ISorter;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 
 /**
  * This Operator pushes group-by aggregation into the external sort.
@@ -57,17 +57,28 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip
     private Algorithm alg = Algorithm.MERGE_SORT;
 
     /**
-     * @param spec                      , the Hyracks job specification
-     * @param framesLimit               , the frame limit for this operator
-     * @param sortFields                , the fields to sort
-     * @param groupFields               , the fields to group, which can be a prefix subset of sortFields
-     * @param firstKeyNormalizerFactory , the normalized key computer factory of the first key
-     * @param comparatorFactories       , the comparator factories of sort keys
-     * @param partialAggregatorFactory  , for aggregating the input of this operator
-     * @param mergeAggregatorFactory    , for aggregating the intermediate data of this operator
-     * @param partialAggRecordDesc      , the record descriptor of intermediate data
-     * @param outRecordDesc             , the record descriptor of output data
-     * @param finalStage                , whether the operator is used for final stage aggregation
+     * @param spec
+     *            , the Hyracks job specification
+     * @param framesLimit
+     *            , the frame limit for this operator
+     * @param sortFields
+     *            , the fields to sort
+     * @param groupFields
+     *            , the fields to group, which can be a prefix subset of sortFields
+     * @param firstKeyNormalizerFactory
+     *            , the normalized key computer factory of the first key
+     * @param comparatorFactories
+     *            , the comparator factories of sort keys
+     * @param partialAggregatorFactory
+     *            , for aggregating the input of this operator
+     * @param mergeAggregatorFactory
+     *            , for aggregating the intermediate data of this operator
+     * @param partialAggRecordDesc
+     *            , the record descriptor of intermediate data
+     * @param outRecordDesc
+     *            , the record descriptor of output data
+     * @param finalStage
+     *            , whether the operator is used for final stage aggregation
      */
     public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
@@ -88,30 +99,6 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip
         this.finalStage = finalStage;
     }
 
-    /**
-     * @param spec                      , the Hyracks job specification
-     * @param framesLimit               , the frame limit for this operator
-     * @param sortFields                , the fields to sort
-     * @param groupFields               , the fields to group, which can be a prefix subset of sortFields
-     * @param firstKeyNormalizerFactory , the normalized key computer factory of the first key
-     * @param comparatorFactories       , the comparator factories of sort keys
-     * @param partialAggregatorFactory  , for aggregating the input of this operator
-     * @param mergeAggregatorFactory    , for aggregating the intermediate data of this operator
-     * @param partialAggRecordDesc      , the record descriptor of intermediate data
-     * @param outRecordDesc             , the record descriptor of output data
-     * @param finalStage                , whether the operator is used for final stage aggregation
-     * @param alg                       , the in-memory sort algorithm
-     */
-    public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
-            int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
-            IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
-            RecordDescriptor outRecordDesc, boolean finalStage, Algorithm alg) {
-        this(spec, framesLimit, sortFields, groupFields, firstKeyNormalizerFactory, comparatorFactories,
-                partialAggregatorFactory, mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, finalStage);
-        this.alg = alg;
-    }
-
     @Override
     public AbstractSorterOperatorDescriptor.SortActivity getSortActivity(ActivityId id) {
         return new AbstractSorterOperatorDescriptor.SortActivity(id) {
@@ -131,9 +118,9 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip
         return new AbstractSorterOperatorDescriptor.MergeActivity(id) {
 
             @Override
-            protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+            protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
-                    List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators,
+                    List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators,
                     INormalizedKeyComputer nmkComputer, int necessaryFrames) {
                 return new ExternalSortGroupByRunMerger(ctx, sorter, runs, sortFields,
                         recordDescProvider.getInputRecordDescriptor(new ActivityId(odId, SORT_ACTIVITY_ID), 0),

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index e45f952..69e9e6a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -33,7 +33,7 @@ import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d215983..d0a81ee 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -47,7 +47,7 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
@@ -96,25 +96,6 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
     public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) throws HyracksDataException {
-        super(spec, 2, 1);
-        this.memsize = memsize;
-        this.inputsize0 = inputsize0;
-        this.factor = factor;
-        this.recordsPerFrame = recordsPerFrame;
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories = hashFunctionFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.predEvaluatorFactory = predEvalFactory;
-        this.isLeftOuter = false;
-        this.nullWriterFactories1 = null;
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
-            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
             INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
         super(spec, 2, 1);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index a139341..fee7dd8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,10 +35,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 public class InMemoryHashJoin {
 
@@ -45,7 +46,7 @@ public class InMemoryHashJoin {
     private final List<ByteBuffer> buffers;
     private final FrameTupleAccessor accessorBuild;
     private final ITuplePartitionComputer tpcBuild;
-    private final FrameTupleAccessor accessorProbe;
+    private IFrameTupleAccessor accessorProbe;
     private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
     private final FrameTuplePairComparator tpComparator;
@@ -59,37 +60,38 @@ public class InMemoryHashJoin {
 
     private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName());
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
-            ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
-            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
+    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe,
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild,
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWritersBuild,
             ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
-        this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval,
+        this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, tpcBuild, comparator, isLeftOuter,
+                nullWritersBuild, table, predEval,
                 false);
     }
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
-            ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
-            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
+    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe,
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild,
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWritersBuild,
             ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
         this.ctx = ctx;
         this.tableSize = tableSize;
         this.table = table;
         storedTuplePointer = new TuplePointer();
         buffers = new ArrayList<ByteBuffer>();
-        this.accessorBuild = accessor1;
-        this.tpcBuild = tpc1;
-        this.accessorProbe = accessor0;
-        this.tpcProbe = tpc0;
+        this.accessorBuild = accessorBuild;
+        this.tpcBuild = tpcBuild;
+        this.accessorProbe = accessorProbe;
+        this.tpcProbe = tpcProbe;
         appender = new FrameTupleAppender(new VSizeFrame(ctx));
         tpComparator = comparator;
         predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
-            int fieldCountOuter = accessor1.getFieldCount();
+            int fieldCountOuter = accessorBuild.getFieldCount();
             nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
             DataOutput out = nullTupleBuild.getDataOutput();
             for (int i = 0; i < fieldCountOuter; i++) {
-                nullWriters1[i].writeNull(out);
+                nullWritersBuild[i].writeNull(out);
                 nullTupleBuild.addFieldEndOffset();
             }
         } else {
@@ -113,36 +115,41 @@ public class InMemoryHashJoin {
         }
     }
 
+    void join(IFrameTupleAccessor accessorProbe, int tid, IFrameWriter writer) throws HyracksDataException {
+        this.accessorProbe = accessorProbe;
+        boolean matchFound = false;
+        if (tableSize != 0) {
+            int entry = tpcProbe.partition(accessorProbe, tid, tableSize);
+            int offset = 0;
+            do {
+                table.getTuplePointer(entry, offset++, storedTuplePointer);
+                if (storedTuplePointer.frameIndex < 0)
+                    break;
+                int bIndex = storedTuplePointer.frameIndex;
+                int tIndex = storedTuplePointer.tupleIndex;
+                accessorBuild.reset(buffers.get(bIndex));
+                int c = tpComparator.compare(accessorProbe, tid, accessorBuild, tIndex);
+                if (c == 0) {
+                    boolean predEval = evaluatePredicate(tid, tIndex);
+                    if (predEval) {
+                        matchFound = true;
+                        appendToResult(tid, tIndex, writer);
+                    }
+                }
+            } while (true);
+        }
+        if (!matchFound && isLeftOuter) {
+            FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, tid,
+                    nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
+                    nullTupleBuild.getSize());
+        }
+    }
+
     public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
         accessorProbe.reset(buffer);
         int tupleCount0 = accessorProbe.getTupleCount();
         for (int i = 0; i < tupleCount0; ++i) {
-            boolean matchFound = false;
-            if (tableSize != 0) {
-                int entry = tpcProbe.partition(accessorProbe, i, tableSize);
-                int offset = 0;
-                do {
-                    table.getTuplePointer(entry, offset++, storedTuplePointer);
-                    if (storedTuplePointer.frameIndex < 0)
-                        break;
-                    int bIndex = storedTuplePointer.frameIndex;
-                    int tIndex = storedTuplePointer.tupleIndex;
-                    accessorBuild.reset(buffers.get(bIndex));
-                    int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
-                    if (c == 0) {
-                        boolean predEval = evaluatePredicate(i, tIndex);
-                        if (predEval) {
-                            matchFound = true;
-                            appendToResult(i, tIndex, writer);
-                        }
-                    }
-                } while (true);
-            }
-            if (!matchFound && isLeftOuter) {
-                FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, i,
-                        nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
-                        nullTupleBuild.getSize());
-            }
+            join(accessorProbe, i, writer);
         }
     }
 
@@ -155,14 +162,6 @@ public class InMemoryHashJoin {
                 + Thread.currentThread().getId() + ".");
     }
 
-    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        writer.nextFrame(buffer);
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-    }
-
     private boolean evaluatePredicate(int tIx1, int tIx2) {
         if (reverseOutputOrder) { //Role Reversal Optimization is triggered
             return ((predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1));
@@ -172,12 +171,10 @@ public class InMemoryHashJoin {
     }
 
     private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
-        if (!reverseOutputOrder) {
-            FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, probeSidetIx, accessorBuild,
-                    buildSidetIx);
+        if (reverseOutputOrder) {
+            FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx);
         } else {
-            FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe,
-                    probeSidetIx);
+            FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, probeSidetIx, accessorBuild, buildSidetIx);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 80b0abe..be8d319 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -42,7 +42,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 6746b50..2ad89cf 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -20,8 +20,6 @@ package org.apache.hyracks.dataflow.std.join;
 
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -38,6 +36,11 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 
 public class NestedLoopJoin {
     private final FrameTupleAccessor accessorInner;
@@ -46,39 +49,38 @@ public class NestedLoopJoin {
     private final ITuplePairComparator tpComparator;
     private final IFrame outBuffer;
     private final IFrame innerBuffer;
-    private final List<ByteBuffer> outBuffers;
-    private final int memSize;
-    private final IHyracksTaskContext ctx;
+    private final VariableFrameMemoryManager outerBufferMngr;
     private RunFileReader runFileReader;
-    private int currentMemSize = 0;
     private final RunFileWriter runFileWriter;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuilder;
     private final IPredicateEvaluator predEvaluator;
     private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+    private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
 
-    public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
-            ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
+    public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner,
+            ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
             INullWriter[] nullWriters1) throws HyracksDataException {
-        this.accessorInner = accessor1;
-        this.accessorOuter = accessor0;
+        this.accessorInner = accessorInner;
+        this.accessorOuter = accessorOuter;
         this.appender = new FrameTupleAppender();
-        this.tpComparator = comparators;
+        this.tpComparator = comparatorsOuter2Inner;
         this.outBuffer = new VSizeFrame(ctx);
         this.innerBuffer = new VSizeFrame(ctx);
         this.appender.reset(outBuffer, true);
-        this.outBuffers = new ArrayList<ByteBuffer>();
-        this.memSize = memSize;
         if (memSize < 3) {
             throw new HyracksDataException("Not enough memory is available for Nested Loop Join");
         }
+        this.outerBufferMngr = new VariableFrameMemoryManager(
+                new VariableFramePool(ctx, ctx.getInitialFrameSize() * (memSize - 2)),
+                FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
+
         this.predEvaluator = predEval;
         this.isReversed = false;
-        this.ctx = ctx;
 
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
-            int innerFieldCount = accessorInner.getFieldCount();
+            int innerFieldCount = this.accessorInner.getFieldCount();
             nullTupleBuilder = new ArrayTupleBuilder(innerFieldCount);
             DataOutput out = nullTupleBuilder.getDataOutput();
             for (int i = 0; i < innerFieldCount; i++) {
@@ -89,8 +91,8 @@ public class NestedLoopJoin {
             nullTupleBuilder = null;
         }
 
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                this.getClass().getSimpleName() + this.toString());
+        FileReference file = ctx.getJobletContext()
+                .createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
         runFileWriter = new RunFileWriter(file, ctx.getIOManager());
         runFileWriter.open();
     }
@@ -100,45 +102,26 @@ public class NestedLoopJoin {
     }
 
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
-        if (outBuffers.size() < memSize - 3) {
-            createAndCopyFrame(outerBuffer);
-            return;
-        }
-        if (currentMemSize < memSize - 3) {
-            reloadFrame(outerBuffer);
-            return;
-        }
-        runFileReader = runFileWriter.createReader();
-        runFileReader.open();
-        while (runFileReader.nextFrame(innerBuffer)) {
-            for (ByteBuffer outBuffer : outBuffers) {
-                blockJoin(outBuffer, innerBuffer.getBuffer(), writer);
+        if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                    blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                }
+            }
+            runFileReader.close();
+            outerBufferMngr.reset();
+            if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
+                throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity()
+                        + " is too big to cache in the buffer. Please choose a larger buffer memory size");
             }
         }
-        runFileReader.close();
-        currentMemSize = 0;
-        reloadFrame(outerBuffer);
-    }
-
-    private void createAndCopyFrame(ByteBuffer outerBuffer) throws HyracksDataException {
-        ByteBuffer outerBufferCopy = ctx.allocateFrame(outerBuffer.capacity());
-        FrameUtils.copyAndFlip(outerBuffer, outerBufferCopy);
-        outBuffers.add(outerBufferCopy);
-        currentMemSize++;
-    }
-
-    private void reloadFrame(ByteBuffer outerBuffer) throws HyracksDataException {
-        outBuffers.get(currentMemSize).clear();
-        if (outBuffers.get(currentMemSize).capacity() != outerBuffer.capacity()) {
-            outBuffers.set(currentMemSize, ctx.allocateFrame(outerBuffer.capacity()));
-        }
-        FrameUtils.copyAndFlip(outerBuffer, outBuffers.get(currentMemSize));
-        currentMemSize++;
     }
 
-    private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+    private void blockJoin(BufferInfo outerBufferInfo, ByteBuffer innerBuffer, IFrameWriter writer)
             throws HyracksDataException {
-        accessorOuter.reset(outerBuffer);
+        accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), outerBufferInfo.getLength());
         accessorInner.reset(innerBuffer);
         int tupleCount0 = accessorOuter.getTupleCount();
         int tupleCount1 = accessorInner.getTupleCount();
@@ -173,11 +156,10 @@ public class NestedLoopJoin {
     }
 
     private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException {
-        if (!isReversed) {
-            appendResultToFrame(accessorOuter, outerTupleId, accessorInner, innerTupleId, writer);
-        } else {
-            //Role Reversal Optimization is triggered
+        if (isReversed) {
             appendResultToFrame(accessorInner, innerTupleId, accessorOuter, outerTupleId, writer);
+        } else {
+            appendResultToFrame(accessorOuter, outerTupleId, accessorInner, innerTupleId, writer);
         }
     }
 
@@ -196,13 +178,12 @@ public class NestedLoopJoin {
         runFileReader = runFileWriter.createDeleteOnCloseReader();
         runFileReader.open();
         while (runFileReader.nextFrame(innerBuffer)) {
-            for (int i = 0; i < currentMemSize; i++) {
-                blockJoin(outBuffers.get(i), innerBuffer.getBuffer(), writer);
+            for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
             }
         }
         runFileReader.close();
-        outBuffers.clear();
-        currentMemSize = 0;
+        outerBufferMngr.reset();
 
         appender.write(writer, true);
     }


Mime
View raw message