asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [09/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject
Date Fri, 26 Feb 2016 05:54:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 7fbde54..44ddf44 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -16,94 +16,70 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.dataflow.std.group;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
-import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 public class HashSpillableTableFactory implements ISpillableTableFactory {
 
+    private static Logger LOGGER = Logger.getLogger(HashSpillableTableFactory.class.getName());
+    private static final double FUDGE_FACTOR = 1.1;
     private static final long serialVersionUID = 1L;
-    private final ITuplePartitionComputerFactory tpcf;
-    private final int tableSize;
+    private final IBinaryHashFunctionFamily[] hashFunctionFamilies;
 
-    public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
-        this.tpcf = tpcf;
-        this.tableSize = tableSize;
+    public HashSpillableTableFactory(IBinaryHashFunctionFamily[] hashFunctionFamilies) {
+        this.hashFunctionFamilies = hashFunctionFamilies;
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see
-     * org.apache.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
-     * buildSpillableTable(org.apache.hyracks.api.context.IHyracksTaskContext,
-     * int[], org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
-     * org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
-     * edu.
-     * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
-     * [], org.apache.hyracks.api.dataflow.value.RecordDescriptor,
-     * org.apache.hyracks.api.dataflow.value.RecordDescriptor, int)
-     */
     @Override
-    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, final int[] keyFields,
-            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-            IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
-        final int[] storedKeys = new int[keyFields.length];
-        @SuppressWarnings("rawtypes")
-        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
-        for (int i = 0; i < keyFields.length; i++) {
-            storedKeys[i] = i;
-            storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
+    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize, long dataBytesSize,
+            final int[] keyFields, final IBinaryComparator[] comparators,
+            final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
+            final int seed) throws HyracksDataException {
+        if (framesLimit < 2) {
+            throw new HyracksDataException("The frame limit is too small to partition the data");
         }
+        final int tableSize = suggestTableSize;
 
-        RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
-        final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(internalRecordDescriptor);
-        final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(internalRecordDescriptor);
-
-        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        final int[] intermediateResultKeys = new int[keyFields.length];
+        for (int i = 0; i < keyFields.length; i++) {
+            intermediateResultKeys[i] = i;
         }
 
-        final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);
-
-        final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);
-
-        final ITuplePartitionComputer tpc = tpcf.createPartitioner();
+        final FrameTuplePairComparator ftpcInputCompareToAggregate = new FrameTuplePairComparator(keyFields,
+                intermediateResultKeys, comparators);
 
-        final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
-                .createNormalizedKeyComputer();
-
-        int[] keyFieldsInPartialResults = new int[keyFields.length];
-        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-            keyFieldsInPartialResults[i] = i;
-        }
+        final ITuplePartitionComputer tpc = new FieldHashPartitionComputerFamily(keyFields, hashFunctionFamilies)
+                .createPartitioner(seed);
 
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, keyFields, keyFieldsInPartialResults, null);
+                outRecordDescriptor, keyFields, intermediateResultKeys, null);
 
         final AggregateState aggregateState = aggregator.createAggregateStates();
 
@@ -114,365 +90,173 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
             stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
         }
 
+        //TODO(jf) research on the optimized partition size
+        final int numPartitions = getNumOfPartitions((int) (dataBytesSize / ctx.getInitialFrameSize()),
+                framesLimit - 1);
+        final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions);
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("create hashtable, table size:" + tableSize + " file size:" + dataBytesSize + "  partitions:"
+                    + numPartitions);
+        }
+
         final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
-        return new ISpillableTable() {
+        final ISerializableTable hashTableForTuplePointer = new SerializableHashTable(tableSize, ctx);
 
-            private int lastBufIndex;
+        return new ISpillableTable() {
 
-            private IFrame outputFrame;
-            private FrameTupleAppender outputAppender;
+            private final TuplePointer pointer = new TuplePointer();
+            private final BitSet spilledSet = new BitSet(numPartitions);
+            final IPartitionedTupleBufferManager bufferManager = new VPartitionTupleBufferManager(ctx,
+                    PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledSet),
+                    numPartitions, framesLimit * ctx.getInitialFrameSize());
 
-            private FrameTupleAppender stateAppender = new FrameTupleAppender();
+            final ITuplePointerAccessor bufferAccessor = bufferManager.getTupleAccessor(outRecordDescriptor);
 
-            private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-            private final TuplePointer storedTuplePointer = new TuplePointer();
-            private final List<IFrame> frames = new ArrayList<>();
+            private final PreferToSpillFullyOccupiedFramePolicy spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(
+                    bufferManager, spilledSet, ctx.getInitialFrameSize());
 
-            /**
-             * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
-             * = Frame index in the "Frames" list, [1] = Tuple index in the
-             * frame, [2] = Poor man's normalized key for the tuple.
-             */
-            private int[] tPointers;
+            private final FrameTupleAppender outputAppender = new FrameTupleAppender(new VSizeFrame(ctx));
 
             @Override
-            public void sortFrames() throws HyracksDataException {
-                int sfIdx = storedKeys[0];
-                int totalTCount = table.getTupleCount();
-                tPointers = new int[totalTCount * 3];
-                int ptr = 0;
-
-                for (int i = 0; i < tableSize; i++) {
-                    int entry = i;
-                    int offset = 0;
-                    do {
-                        table.getTuplePointer(entry, offset, storedTuplePointer);
-                        if (storedTuplePointer.frameIndex < 0)
-                            break;
-                        tPointers[ptr * 3] = entry;
-                        tPointers[ptr * 3 + 1] = offset;
-                        table.getTuplePointer(entry, offset, storedTuplePointer);
-                        int fIndex = storedTuplePointer.frameIndex;
-                        int tIndex = storedTuplePointer.tupleIndex;
-                        storedKeysAccessor1.reset(frames.get(fIndex).getBuffer());
-                        int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
-                        int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
-                        int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
-                        int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
-                        tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
-                                .array(), f0Start, f0EndRel - f0StartRel);
-                        ptr++;
-                        offset++;
-                    } while (true);
-                }
-                /**
-                 * Sort using quick sort
-                 */
-                if (tPointers.length > 0) {
-                    sort(tPointers, 0, totalTCount);
-                }
+            public void close() throws HyracksDataException {
+                hashTableForTuplePointer.close();
+                aggregator.close();
             }
 
             @Override
-            public void reset() {
-                lastBufIndex = -1;
-                tPointers = null;
-                table.reset();
-                aggregator.reset();
+            public void clear(int partition) throws HyracksDataException {
+                for (int p = getFirstEntryInHashTable(partition); p < getLastEntryInHashTable(partition); p++) {
+                    hashTableForTuplePointer.delete(p);
+                }
+                bufferManager.clearPartition(partition);
             }
 
-            @Override
-            public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                if (lastBufIndex < 0)
-                    nextAvailableFrame();
-                int entry = tpc.partition(accessor, tIndex, tableSize);
-                boolean foundGroup = false;
-                int offset = 0;
-                do {
-                    table.getTuplePointer(entry, offset++, storedTuplePointer);
-                    if (storedTuplePointer.frameIndex < 0)
-                        break;
-                    storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex).getBuffer());
-                    int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
-                    if (c == 0) {
-                        foundGroup = true;
-                        break;
-                    }
-                } while (true);
-
-                if (!foundGroup) {
+            private int getPartition(int entryInHashTable) {
+                return entryInHashTable / entriesPerPartition;
+            }
 
-                    stateTupleBuilder.reset();
+            private int getFirstEntryInHashTable(int partition) {
+                return partition * entriesPerPartition;
+            }
 
-                    for (int k = 0; k < keyFields.length; k++) {
-                        stateTupleBuilder.addField(accessor, tIndex, keyFields[k]);
-                    }
+            private int getLastEntryInHashTable(int partition) {
+                return Math.min(tableSize, (partition + 1) * entriesPerPartition);
+            }
 
-                    aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
-                    if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
-                            stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
-                        if (!nextAvailableFrame()) {
-                            return false;
-                        }
-                        if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
-                                stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
-                            throw new HyracksDataException("Cannot init external aggregate state in a frame.");
-                        }
+            @Override
+            public boolean insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int entryInHashTable = tpc.partition(accessor, tIndex, tableSize);
+                for (int i = 0; i < hashTableForTuplePointer.getTupleCount(entryInHashTable); i++) {
+                    hashTableForTuplePointer.getTuplePointer(entryInHashTable, i, pointer);
+                    bufferAccessor.reset(pointer);
+                    int c = ftpcInputCompareToAggregate.compare(accessor, tIndex, bufferAccessor);
+                    if (c == 0) {
+                        aggregateExistingTuple(accessor, tIndex, bufferAccessor, pointer.tupleIndex);
+                        return true;
                     }
+                }
 
-                    storedTuplePointer.frameIndex = lastBufIndex;
-                    storedTuplePointer.tupleIndex = stateAppender.getTupleCount() - 1;
-                    table.insert(entry, storedTuplePointer);
-                } else {
+                return insertNewAggregateEntry(entryInHashTable, accessor, tIndex);
+            }
 
-                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex,
-                            aggregateState);
+            private boolean insertNewAggregateEntry(int entryInHashTable, IFrameTupleAccessor accessor, int tIndex)
+                    throws HyracksDataException {
+                initStateTupleBuilder(accessor, tIndex);
+                int pid = getPartition(entryInHashTable);
 
+                if (!bufferManager.insertTuple(pid, stateTupleBuilder.getByteArray(),
+                        stateTupleBuilder.getFieldEndOffsets(), 0, stateTupleBuilder.getSize(), pointer)) {
+                    return false;
                 }
+                hashTableForTuplePointer.insert(entryInHashTable, pointer);
                 return true;
             }
 
-            @Override
-            public List<IFrame> getFrames() {
-                return frames;
+            private void initStateTupleBuilder(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                stateTupleBuilder.reset();
+                for (int k = 0; k < keyFields.length; k++) {
+                    stateTupleBuilder.addField(accessor, tIndex, keyFields[k]);
+                }
+                aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
             }
 
-            @Override
-            public int getFrameCount() {
-                return lastBufIndex;
+            private void aggregateExistingTuple(IFrameTupleAccessor accessor, int tIndex,
+                    ITuplePointerAccessor bufferAccessor, int tupleIndex) throws HyracksDataException {
+                aggregator.aggregate(accessor, tIndex, bufferAccessor, tupleIndex, aggregateState);
             }
 
             @Override
-            public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
-                if (outputFrame == null) {
-                    outputFrame = new VSizeFrame(ctx);
-                }
-
-                if (outputAppender == null) {
-                    outputAppender = new FrameTupleAppender();
-                }
-
-                outputAppender.reset(outputFrame, true);
+            public int flushFrames(int partition, IFrameWriter writer, AggregateType type) throws HyracksDataException {
+                int count = 0;
+                for (int hashEntryPid = getFirstEntryInHashTable(partition); hashEntryPid < getLastEntryInHashTable(
+                        partition); hashEntryPid++) {
+                    count += hashTableForTuplePointer.getTupleCount(hashEntryPid);
+                    for (int tid = 0; tid < hashTableForTuplePointer.getTupleCount(hashEntryPid); tid++) {
+                        hashTableForTuplePointer.getTuplePointer(hashEntryPid, tid, pointer);
+                        bufferAccessor.reset(pointer);
+                        outputTupleBuilder.reset();
+                        for (int k = 0; k < intermediateResultKeys.length; k++) {
+                            outputTupleBuilder.addField(bufferAccessor.getBuffer().array(),
+                                    bufferAccessor.getAbsFieldStartOffset(intermediateResultKeys[k]),
+                                    bufferAccessor.getFieldLength(intermediateResultKeys[k]));
+                        }
 
-                if (tPointers == null) {
-                    // Not sorted
-                    for (int i = 0; i < tableSize; ++i) {
-                        int entry = i;
-                        int offset = 0;
-                        do {
-                            table.getTuplePointer(entry, offset++, storedTuplePointer);
-                            if (storedTuplePointer.frameIndex < 0)
+                        boolean hasOutput = false;
+                        switch (type) {
+                            case PARTIAL:
+                                hasOutput = aggregator.outputPartialResult(outputTupleBuilder, bufferAccessor,
+                                        pointer.tupleIndex, aggregateState);
                                 break;
-                            int bIndex = storedTuplePointer.frameIndex;
-                            int tIndex = storedTuplePointer.tupleIndex;
-
-                            storedKeysAccessor1.reset(frames.get(bIndex).getBuffer());
-
-                            outputTupleBuilder.reset();
-                            for (int k = 0; k < storedKeys.length; k++) {
-                                outputTupleBuilder.addField(storedKeysAccessor1, tIndex, storedKeys[k]);
-                            }
-
-                            if (isPartial) {
-
-                                aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
-                                        aggregateState);
-
-                            } else {
-
-                                aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
-                                        aggregateState);
-                            }
+                            case FINAL:
+                                hasOutput = aggregator.outputFinalResult(outputTupleBuilder, bufferAccessor,
+                                        pointer.tupleIndex, aggregateState);
+                                break;
+                        }
 
+                        if (hasOutput && !outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                            outputAppender.write(writer, true);
                             if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                     outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                                outputAppender.write(writer, true);
-                                if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
-                                        outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                                    throw new HyracksDataException(
-                                            "The output item is too large to be fit into a frame.");
-                                }
+                                throw new HyracksDataException("The output item is too large to be fit into a frame.");
                             }
-
-                        } while (true);
-                    }
-                    outputAppender.write(writer, true);
-                    aggregator.close();
-                    return;
-                }
-                int n = tPointers.length / 3;
-                for (int ptr = 0; ptr < n; ptr++) {
-                    int tableIndex = tPointers[ptr * 3];
-                    int rowIndex = tPointers[ptr * 3 + 1];
-                    table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
-                    int frameIndex = storedTuplePointer.frameIndex;
-                    int tupleIndex = storedTuplePointer.tupleIndex;
-                    // Get the frame containing the value
-                    IFrame buffer = frames.get(frameIndex);
-                    storedKeysAccessor1.reset(buffer.getBuffer());
-
-                    outputTupleBuilder.reset();
-                    for (int k = 0; k < storedKeys.length; k++) {
-                        outputTupleBuilder.addField(storedKeysAccessor1, tupleIndex, storedKeys[k]);
-                    }
-
-                    if (isPartial) {
-
-                        aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
-                                aggregateState);
-
-                    } else {
-
-                        aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
-                                aggregateState);
-                    }
-
-                    if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                        outputAppender.write(writer, true);
-                        if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
-                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                            throw new HyracksDataException("The output item is too large to be fit into a frame.");
                         }
                     }
                 }
                 outputAppender.write(writer, true);
-                aggregator.close();
+                spilledSet.set(partition);
+                return count;
             }
 
             @Override
-            public void close() {
-                lastBufIndex = -1;
-                tPointers = null;
-                table.close();
-                frames.clear();
-                aggregateState.close();
-            }
-
-            /**
-             * Set the working frame to the next available frame in the frame
-             * list. There are two cases:<br>
-             * 1) If the next frame is not initialized, allocate a new frame. 2)
-             * When frames are already created, they are recycled.
-             *
-             * @return Whether a new frame is added successfully.
-             * @throws HyracksDataException
-             */
-            private boolean nextAvailableFrame() throws HyracksDataException {
-                // Return false if the number of frames is equal to the limit.
-                if (lastBufIndex + 1 >= framesLimit)
-                    return false;
-
-                if (frames.size() < framesLimit) {
-                    // Insert a new frame
-                    IFrame frame = new VSizeFrame(ctx);
-                    frames.add(frame);
-                    stateAppender.reset(frame, true);
-                    lastBufIndex = frames.size() - 1;
-                } else {
-                    // Reuse an old frame
-                    lastBufIndex++;
-                    stateAppender.reset(frames.get(lastBufIndex), true);
-                }
-                return true;
+            public int getNumPartitions() {
+                return bufferManager.getNumPartitions();
             }
 
-            private void sort(int[] tPointers, int offset, int length) throws HyracksDataException {
-                int m = offset + (length >> 1);
-                int mTable = tPointers[m * 3];
-                int mRow = tPointers[m * 3 + 1];
-                int mNormKey = tPointers[m * 3 + 2];
-
-                table.getTuplePointer(mTable, mRow, storedTuplePointer);
-                int mFrame = storedTuplePointer.frameIndex;
-                int mTuple = storedTuplePointer.tupleIndex;
-                storedKeysAccessor1.reset(frames.get(mFrame).getBuffer());
-
-                int a = offset;
-                int b = a;
-                int c = offset + length - 1;
-                int d = c;
-                while (true) {
-                    while (b <= c) {
-                        int bTable = tPointers[b * 3];
-                        int bRow = tPointers[b * 3 + 1];
-                        int bNormKey = tPointers[b * 3 + 2];
-                        int cmp = 0;
-                        if (bNormKey != mNormKey) {
-                            cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
-                        } else {
-                            table.getTuplePointer(bTable, bRow, storedTuplePointer);
-                            int bFrame = storedTuplePointer.frameIndex;
-                            int bTuple = storedTuplePointer.tupleIndex;
-                            storedKeysAccessor2.reset(frames.get(bFrame).getBuffer());
-                            cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
-                        }
-                        if (cmp > 0) {
-                            break;
-                        }
-                        if (cmp == 0) {
-                            swap(tPointers, a++, b);
-                        }
-                        ++b;
-                    }
-                    while (c >= b) {
-                        int cTable = tPointers[c * 3];
-                        int cRow = tPointers[c * 3 + 1];
-                        int cNormKey = tPointers[c * 3 + 2];
-                        int cmp = 0;
-                        if (cNormKey != mNormKey) {
-                            cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
-                        } else {
-                            table.getTuplePointer(cTable, cRow, storedTuplePointer);
-                            int cFrame = storedTuplePointer.frameIndex;
-                            int cTuple = storedTuplePointer.tupleIndex;
-                            storedKeysAccessor2.reset(frames.get(cFrame).getBuffer());
-                            cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
-                        }
-                        if (cmp < 0) {
-                            break;
-                        }
-                        if (cmp == 0) {
-                            swap(tPointers, c, d--);
-                        }
-                        --c;
-                    }
-                    if (b > c)
-                        break;
-                    swap(tPointers, b++, c--);
-                }
-
-                int s;
-                int n = offset + length;
-                s = Math.min(a - offset, b - a);
-                vecswap(tPointers, offset, b - s, s);
-                s = Math.min(d - c, n - d - 1);
-                vecswap(tPointers, b, n - s, s);
-
-                if ((s = b - a) > 1) {
-                    sort(tPointers, offset, s);
-                }
-                if ((s = d - c) > 1) {
-                    sort(tPointers, n - s, s);
-                }
-            }
-
-            private void swap(int x[], int a, int b) {
-                for (int i = 0; i < 3; ++i) {
-                    int t = x[a * 3 + i];
-                    x[a * 3 + i] = x[b * 3 + i];
-                    x[b * 3 + i] = t;
-                }
-            }
-
-            private void vecswap(int x[], int a, int b, int n) {
-                for (int i = 0; i < n; i++, a++, b++) {
-                    swap(x, a, b);
-                }
+            @Override
+            public int findVictimPartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int entryInHashTable = tpc.partition(accessor, tIndex, tableSize);
+                int partition = getPartition(entryInHashTable);
+                return spillPolicy.selectVictimPartition(partition);
             }
-
         };
     }
 
+    private int getNumOfPartitions(int nubmerOfFramesForData, int frameLimit) {
+        if (frameLimit > nubmerOfFramesForData) {
+            return 1; // all in memory, we will create a big partition
+        }
+        int numberOfPartitions = (int) (Math
+                .ceil((nubmerOfFramesForData * FUDGE_FACTOR - frameLimit) / (frameLimit - 1)));
+        if (numberOfPartitions <= 0) {
+            numberOfPartitions = 1; //becomes in-memory hash
+        }
+        if (numberOfPartitions > frameLimit) {
+            numberOfPartitions = (int) Math.ceil(Math.sqrt(nubmerOfFramesForData * FUDGE_FACTOR));
+            return Math.max(2, Math.min(numberOfPartitions, frameLimit));
+        }
+        return numberOfPartitions;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 4607032..9552294 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -29,21 +29,18 @@ public interface IAggregatorDescriptor {
      *
      * @return
      */
-    public AggregateState createAggregateStates();
+    AggregateState createAggregateStates();
 
     /**
      * Initialize the state based on the input tuple.
-     *
+     * 
+     * @param tupleBuilder
      * @param accessor
      * @param tIndex
-     * @param fieldOutput
-     *            The data output for the frame containing the state. This may
-     *            be null, if the state is maintained as a java object
      * @param state
-     *            The state to be initialized.
      * @throws HyracksDataException
      */
-    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+    void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
             throws HyracksDataException;
 
     /**
@@ -51,60 +48,58 @@ public interface IAggregatorDescriptor {
      * too. Note that here the frame is not an input argument, since it can be
      * reset outside of the aggregator (simply reset the starting index of the
      * buffer).
-     *
-     * @param state
      */
-    public void reset();
+    void reset();
 
     /**
      * Aggregate the value. Aggregate state should be updated correspondingly.
      *
      * @param accessor
      * @param tIndex
-     * @param data
+     * @param stateAccessor
      *            The buffer containing the state, if frame-based-state is used.
      *            This means that it can be null if java-object-based-state is
      *            used.
-     * @param offset
+     * @param stateTupleIndex
      * @param state
      *            The aggregate state.
      * @throws HyracksDataException
      */
-    public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-            int stateTupleIndex, AggregateState state) throws HyracksDataException;
+    void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+            AggregateState state) throws HyracksDataException;
 
     /**
      * Output the partial aggregation result.
      *
-     * @param fieldOutput
-     *            The data output for the output frame
-     * @param data
-     *            The buffer containing the aggregation state
-     * @param offset
+     * @param tupleBuilder
+     *            The data output for the output aggregation result
+     * @param stateAccessor
+     *            The stateAccessor buffer containing the aggregation state
+     * @param tIndex
      * @param state
      *            The aggregation state.
-     * @return TODO
+     * @return true if it has any output writed to {@code tupleBuilder}
      * @throws HyracksDataException
      */
-    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
     /**
      * Output the final aggregation result.
      *
-     * @param fieldOutput
+     * @param tupleBuilder
      *            The data output for the output frame
-     * @param data
+     * @param stateAccessor
      *            The buffer containing the aggregation state
-     * @param offset
+     * @param tIndex
      * @param state
      *            The aggregation state.
-     * @return TODO
+     * @return true if it has any output writed to {@code tupleBuilder}
      * @throws HyracksDataException
      */
-    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
-    public void close();
+    void close();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
index 1f99183..2b9ad54 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
@@ -18,26 +18,56 @@
  */
 package org.apache.hyracks.dataflow.std.group;
 
-import java.util.List;
-
-import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public interface ISpillableTable {
 
-    public void close();
-
-    public void reset();
+    /**
+     * Release all the storage resources.
+     * @throws HyracksDataException
+     */
+    void close() throws HyracksDataException;
 
-    public int getFrameCount();
+    /**
+     * Reset the specific partition to the initial state. The occupied resources will be released.
+     * @param partition
+     * @throws HyracksDataException
+     */
+    void clear(int partition) throws HyracksDataException;
 
-    public List<IFrame> getFrames();
+    /**
+     * Insert the specific tuple into the table.
+     * @param accessor
+     * @param tIndex
+     * @return
+     * @throws HyracksDataException
+     */
+    boolean insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
 
-    public void sortFrames() throws HyracksDataException;
+    /**
+     * Flush the certain partition to writer, and return the numOfTuples that have been flushed
+     * @param partition
+     * @param writer
+     * @param type
+     * @return
+     * @throws HyracksDataException
+     */
+    int flushFrames(int partition, IFrameWriter writer, AggregateType type) throws HyracksDataException;
 
-    public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+    /**
+     * Get number of partitions
+     */
+    int getNumPartitions();
 
-    public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
+    /**
+     * When the table is full, it will return a proper partition which will be the flush() candidate.
+     * The {@code accessor} and {@code tIndex} given the reference to the tuple to be inserted.
+     * @return the partition id of the victim, -1 if it failed to find a partition
+     * @param accessor
+     * @param tIndex
+     */
+    int findVictimPartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
index 6b90b37..dbe6858 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -21,15 +21,15 @@ package org.apache.hyracks.dataflow.std.group;
 import java.io.Serializable;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ISpillableTableFactory extends Serializable {
-    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
-            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
+    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int inputSizeInTuple, long dataBytesSize, int[] keyFields,
+            IBinaryComparator[] comparatorFactories, INormalizedKeyComputer firstKeyNormalizerFactory,
             IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
+            RecordDescriptor outRecordDescriptor, int framesLimit, int seed) throws HyracksDataException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index b25d16c..e326f39 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -80,15 +80,15 @@ public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregator
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
                     int tIndex, AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int tupleOffset = stateAccessor.getTupleStartOffset(tIndex);
                 for (int i = 0; i < aggregators.length; i++) {
-                    int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
-                    aggregators[i].outputPartialResult(dos, accessor.getBuffer().array(),
-                            fieldOffset + accessor.getFieldSlotsLength() + tupleOffset,
+                    int fieldOffset = stateAccessor.getFieldStartOffset(tIndex, keys.length + i);
+                    aggregators[i].outputPartialResult(dos, stateAccessor.getBuffer().array(),
+                            fieldOffset + stateAccessor.getFieldSlotsLength() + tupleOffset,
                             ((AggregateState[]) state.state)[i]);
                     tupleBuilder.addFieldEndOffset();
                 }
@@ -96,16 +96,16 @@ public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregator
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int tupleOffset = stateAccessor.getTupleStartOffset(tIndex);
                 for (int i = 0; i < aggregators.length; i++) {
                     if (aggregators[i].needsBinaryState()) {
-                        int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
-                        aggregators[i].outputFinalResult(dos, accessor.getBuffer().array(),
-                                tupleOffset + accessor.getFieldSlotsLength() + fieldOffset,
+                        int fieldOffset = stateAccessor.getFieldStartOffset(tIndex, keys.length + i);
+                        aggregators[i].outputFinalResult(dos, stateAccessor.getBuffer().array(),
+                                tupleOffset + stateAccessor.getFieldSlotsLength() + fieldOffset,
                                 ((AggregateState[]) state.state)[i]);
                     } else {
                         aggregators[i].outputFinalResult(dos, null, 0, ((AggregateState[]) state.state)[i]);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index c499749..ba3853c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -18,125 +18,123 @@
  */
 package org.apache.hyracks.dataflow.std.group.external;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.ISpillableTable;
 import org.apache.hyracks.dataflow.std.group.ISpillableTableFactory;
 
-class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+public class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable
+        implements IRunFileWriterGenerator {
+
+    private static Logger LOGGER = Logger.getLogger("ExternalGroupBuildPhase");
     private final IHyracksTaskContext ctx;
     private final Object stateId;
     private final int[] keyFields;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final INormalizedKeyComputerFactory firstNormalizerFactory;
+    private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer firstNormalizerComputer;
     private final IAggregatorDescriptorFactory aggregatorFactory;
     private final int framesLimit;
     private final ISpillableTableFactory spillableTableFactory;
     private final RecordDescriptor inRecordDescriptor;
     private final RecordDescriptor outRecordDescriptor;
-    private final FrameTupleAccessor accessor;
+    private final int tableSize;
+    private final long fileSize;
 
+    private ExternalHashGroupBy externalGroupBy;
     private ExternalGroupState state;
+    private boolean isFailed = false;
 
-    ExternalGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keyFields, int framesLimit,
-            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, ISpillableTableFactory spillableTableFactory) {
+    public ExternalGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int tableSize, long fileSize,
+            int[] keyFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory firstNormalizerFactory, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor,
+            ISpillableTableFactory spillableTableFactory) {
         this.ctx = ctx;
         this.stateId = stateId;
         this.framesLimit = framesLimit;
         this.aggregatorFactory = aggregatorFactory;
         this.keyFields = keyFields;
-        this.comparatorFactories = comparatorFactories;
-        this.firstNormalizerFactory = firstNormalizerFactory;
+        this.comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.firstNormalizerComputer = firstNormalizerFactory.createNormalizedKeyComputer();
         this.spillableTableFactory = spillableTableFactory;
         this.inRecordDescriptor = inRecordDescriptor;
         this.outRecordDescriptor = outRecordDescriptor;
-        this.accessor = new FrameTupleAccessor(inRecordDescriptor);
+        this.tableSize = tableSize;
+        this.fileSize = fileSize;
     }
 
     @Override
     public void open() throws HyracksDataException {
         state = new ExternalGroupState(ctx.getJobletContext().getJobId(), stateId);
-        state.setRuns(new LinkedList<RunFileReader>());
-        ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
-                firstNormalizerFactory, aggregatorFactory, inRecordDescriptor, outRecordDescriptor, framesLimit);
-        table.reset();
+        ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, tableSize, fileSize, keyFields,
+                comparators, firstNormalizerComputer, aggregatorFactory, inRecordDescriptor, outRecordDescriptor,
+                framesLimit, 0);
+        RunFileWriter[] runFileWriters = new RunFileWriter[table.getNumPartitions()];
+        this.externalGroupBy = new ExternalHashGroupBy(this, table, runFileWriters, inRecordDescriptor);
+
         state.setSpillableTable(table);
+        state.setRuns(runFileWriters);
+        state.setSpilledNumTuples(externalGroupBy.getSpilledNumTuples());
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor.reset(buffer);
-        int tupleCount = accessor.getTupleCount();
-        ISpillableTable gTable = state.getSpillableTable();
-        for (int i = 0; i < tupleCount; i++) {
-            /**
-             * If the group table is too large, flush the table into
-             * a run file.
-             */
-            if (!gTable.insert(accessor, i)) {
-                flushFramesToRun();
-                if (!gTable.insert(accessor, i))
-                    throw new HyracksDataException("Failed to insert a new buffer into the aggregate operator!");
-            }
-        }
+        externalGroupBy.insert(buffer);
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        //do nothing for failures
+        isFailed = true;
     }
 
     @Override
     public void close() throws HyracksDataException {
-        ISpillableTable gTable = state.getSpillableTable();
-        if (gTable.getFrameCount() >= 0) {
-            if (state.getRuns().size() > 0) {
-                /**
-                 * flush the memory into the run file.
-                 */
-                flushFramesToRun();
-                gTable.close();
-                gTable = null;
+        if (isFailed) {
+            for (int i = 0; i < state.getRuns().length; i++) {
+                RunFileWriter run = state.getRuns()[i];
+                if (run != null) {
+                    run.getFileReference().delete();
+                }
+            }
+        } else {
+            externalGroupBy.flushSpilledPartitions();
+            ctx.setStateObject(state);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                int numOfPartition = state.getSpillableTable().getNumPartitions();
+                int numOfSpilledPart = 0;
+                for (int i = 0; i < numOfPartition; i++) {
+                    if (state.getSpilledNumTuples()[i] > 0) {
+                        numOfSpilledPart++;
+                    }
+                }
+                LOGGER.fine("level 0:" + "build with " + numOfPartition + " partitions" + ", spilled "
+                        + numOfSpilledPart + " partitions");
             }
         }
-        ctx.setStateObject(state);
+        state = null;
+        externalGroupBy = null;
     }
 
-    private void flushFramesToRun() throws HyracksDataException {
-        FileReference runFile;
-        try {
-            runFile = ctx.getJobletContext().createManagedWorkspaceFile(
-                    ExternalGroupOperatorDescriptor.class.getSimpleName());
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-        RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
-        writer.open();
-        ISpillableTable gTable = state.getSpillableTable();
-        try {
-            gTable.sortFrames();
-            gTable.flushFrames(writer, true);
-        } catch (Exception ex) {
-            throw new HyracksDataException(ex);
-        } finally {
-            writer.close();
-        }
-        gTable.reset();
-        state.getRuns().add(writer.createDeleteOnCloseReader());
+    @Override
+    public RunFileWriter getRunFileWriter() throws HyracksDataException {
+        FileReference file = ctx.getJobletContext()
+                .createManagedWorkspaceFile(ExternalGroupOperatorDescriptor.class.getSimpleName());
+        return new RunFileWriter(file, ctx.getIOManager());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
deleted file mode 100644
index 5c79014..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group.external;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderAccessor;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.group.AggregateState;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.ISpillableTable;
-import org.apache.hyracks.dataflow.std.util.ReferenceEntry;
-import org.apache.hyracks.dataflow.std.util.ReferencedPriorityQueue;
-
-class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-    private final IHyracksTaskContext ctx;
-    private final Object stateId;
-    private final int[] keyFields;
-    private final IBinaryComparator[] comparators;
-    private final INormalizedKeyComputer nmkComputer;
-    private final AggregateState aggregateState;
-    private final ArrayTupleBuilder tupleBuilder;
-    private final int[] storedKeys;
-    private final IAggregatorDescriptor aggregator;
-    private final boolean isOutputSorted;
-    private final int framesLimit;
-    private final RecordDescriptor outRecordDescriptor;
-    /**
-     * Input frames, one for each run file.
-     */
-    private List<IFrame> inFrames;
-    /**
-     * Output frame.
-     */
-    private IFrame outFrame, writerFrame;
-    private final FrameTupleAppenderAccessor outAppender;
-    private FrameTupleAppender writerAppender;
-    private LinkedList<RunFileReader> runs;
-    private ExternalGroupState aggState;
-    private ArrayTupleBuilder finalTupleBuilder;
-    /**
-     * how many frames to be read ahead once
-     */
-    private int runFrameLimit = 1;
-    private int[] currentFrameIndexInRun;
-    private int[] currentRunFrames;
-
-    ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
-            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nmkFactory, int[] keyFields,
-            IAggregatorDescriptorFactory mergerFactory, boolean isOutputSorted, int framesLimit,
-            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-        this.stateId = stateId;
-        this.keyFields = keyFields;
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        this.nmkComputer = nmkFactory == null ? null : nmkFactory.createNormalizedKeyComputer();
-        int[] keyFieldsInPartialResults = new int[keyFields.length];
-        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-            keyFieldsInPartialResults[i] = i;
-        }
-
-        aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
-                keyFieldsInPartialResults, writer);
-        aggregateState = aggregator.createAggregateStates();
-
-        storedKeys = new int[keyFields.length];
-        /**
-         * Get the list of the fields in the stored records.
-         */
-        for (int i = 0; i < keyFields.length; ++i) {
-            storedKeys[i] = i;
-        }
-
-        tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        this.ctx = ctx;
-        outAppender = new FrameTupleAppenderAccessor(outRecordDescriptor);
-        this.isOutputSorted = isOutputSorted;
-        this.framesLimit = framesLimit;
-        this.outRecordDescriptor = outRecordDescriptor;
-    }
-
-    @Override
-    public void initialize() throws HyracksDataException {
-        aggState = (ExternalGroupState) ctx.getStateObject(stateId);
-        runs = aggState.getRuns();
-        try {
-            writer.open();
-            if (runs.size() <= 0) {
-                ISpillableTable gTable = aggState.getSpillableTable();
-                if (gTable != null) {
-                    if (isOutputSorted)
-                        gTable.sortFrames();
-                    gTable.flushFrames(writer, false);
-                }
-                gTable = null;
-                aggState = null;
-            } else {
-                aggState = null;
-                runs = new LinkedList<RunFileReader>(runs);
-                inFrames = new ArrayList<>();
-                outFrame = new VSizeFrame(ctx);
-                outAppender.reset(outFrame, true);
-                while (runs.size() > 0) {
-                    try {
-                        doPass(runs);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-                inFrames.clear();
-            }
-        } catch (Exception e) {
-            writer.fail();
-            throw new HyracksDataException(e);
-        } finally {
-            aggregateState.close();
-            writer.close();
-        }
-    }
-
-    private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
-        FileReference newRun = null;
-        IFrameWriter writer = this.writer;
-        boolean finalPass = false;
-
-        while (inFrames.size() + 2 < framesLimit) {
-            inFrames.add(new VSizeFrame(ctx));
-        }
-        int runNumber;
-        if (runs.size() + 2 <= framesLimit) {
-            finalPass = true;
-            runFrameLimit = (framesLimit - 2) / runs.size();
-            runNumber = runs.size();
-        } else {
-            runNumber = framesLimit - 2;
-            newRun = ctx.getJobletContext().createManagedWorkspaceFile(
-                    ExternalGroupOperatorDescriptor.class.getSimpleName());
-            writer = new RunFileWriter(newRun, ctx.getIOManager());
-            writer.open();
-        }
-        try {
-            currentFrameIndexInRun = new int[runNumber];
-            currentRunFrames = new int[runNumber];
-            /**
-             * Create file readers for each input run file, only for
-             * the ones fit into the inFrames
-             */
-            RunFileReader[] runFileReaders = new RunFileReader[runNumber];
-            FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-            Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-            ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(runNumber, comparator, keyFields,
-                    nmkComputer);
-            /**
-             * current tuple index in each run
-             */
-            int[] tupleIndices = new int[runNumber];
-
-            for (int i = 0; i < runNumber; i++) {
-                int runIndex = topTuples.peek().getRunid();
-                tupleIndices[runIndex] = 0;
-                // Load the run file
-                runFileReaders[runIndex] = runs.get(runIndex);
-                runFileReaders[runIndex].open();
-
-                currentRunFrames[runIndex] = 0;
-                currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
-                for (int j = 0; j < runFrameLimit; j++) {
-                    int frameIndex = currentFrameIndexInRun[runIndex] + j;
-                    if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
-                        tupleAccessors[frameIndex] = new FrameTupleAccessor(outRecordDescriptor);
-                        tupleAccessors[frameIndex].reset(inFrames.get(frameIndex).getBuffer());
-                        currentRunFrames[runIndex]++;
-                        if (j == 0)
-                            setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
-                    } else {
-                        break;
-                    }
-                }
-            }
-
-            /**
-             * Start merging
-             */
-            while (!topTuples.areRunsExhausted()) {
-                /**
-                 * Get the top record
-                 */
-                ReferenceEntry top = topTuples.peek();
-                int tupleIndex = top.getTupleIndex();
-                int runIndex = topTuples.peek().getRunid();
-                IFrameTupleAccessor fta = top.getAccessor();
-
-                int currentTupleInOutFrame = outAppender.getTupleCount() - 1;
-                if (currentTupleInOutFrame < 0
-                        || compareFrameTuples(fta, tupleIndex, outAppender, currentTupleInOutFrame) != 0) {
-                    /**
-                     * Initialize the first output record Reset the
-                     * tuple builder
-                     */
-
-                    tupleBuilder.reset();
-
-                    for (int k = 0; k < storedKeys.length; k++) {
-                        tupleBuilder.addField(fta, tupleIndex, storedKeys[k]);
-                    }
-
-                    aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
-
-                    if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
-                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-                        flushOutFrame(writer, finalPass);
-                        if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
-                                tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-                            throw new HyracksDataException(
-                                    "The partial result is too large to be initialized in a frame.");
-                        }
-                    }
-
-                } else {
-                    /**
-                     * if new tuple is in the same group of the
-                     * current aggregator do merge and output to the
-                     * outFrame
-                     */
-
-                    aggregator.aggregate(fta, tupleIndex, outAppender, currentTupleInOutFrame, aggregateState);
-
-                }
-                tupleIndices[runIndex]++;
-                setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
-            }
-
-            if (outAppender.getTupleCount() > 0) {
-                flushOutFrame(writer, finalPass);
-                outAppender.reset(outFrame, true);
-            }
-
-            aggregator.close();
-
-            runs.subList(0, runNumber).clear();
-            /**
-             * insert the new run file into the beginning of the run
-             * file list
-             */
-            if (!finalPass) {
-                runs.add(0, ((RunFileWriter) writer).createDeleteOnCloseReader());
-            }
-        } finally {
-            if (!finalPass) {
-                writer.close();
-            }
-        }
-    }
-
-    private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
-
-        if (finalTupleBuilder == null) {
-            finalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        }
-
-        if (writerFrame == null) {
-            writerFrame = new VSizeFrame(ctx);
-        }
-
-        if (writerAppender == null) {
-            writerAppender = new FrameTupleAppender();
-            writerAppender.reset(writerFrame, true);
-        }
-
-        for (int i = 0; i < outAppender.getTupleCount(); i++) {
-
-            finalTupleBuilder.reset();
-
-            for (int k = 0; k < storedKeys.length; k++) {
-                finalTupleBuilder.addField(outAppender, i, storedKeys[k]);
-            }
-
-            if (isFinal) {
-
-                aggregator.outputFinalResult(finalTupleBuilder, outAppender, i, aggregateState);
-
-            } else {
-
-                aggregator.outputPartialResult(finalTupleBuilder, outAppender, i, aggregateState);
-            }
-
-            if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
-                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                writerAppender.write(writer, true);
-                if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
-                        finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                    throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
-                }
-            }
-        }
-        writerAppender.write(writer, true);
-
-    }
-
-    private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
-        int runStart = runIndex * runFrameLimit;
-        boolean existNext = false;
-        if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
-            /**
-             * run already closed
-             */
-            existNext = false;
-        } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
-            /**
-             * not the last frame for this run
-             */
-            existNext = true;
-            if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
-                tupleIndices[runIndex] = 0;
-                currentFrameIndexInRun[runIndex]++;
-            }
-        } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
-            /**
-             * the last frame has expired
-             */
-            existNext = true;
-        } else {
-            /**
-             * If all tuples in the targeting frame have been
-             * checked.
-             */
-            tupleIndices[runIndex] = 0;
-            currentFrameIndexInRun[runIndex] = runStart;
-            /**
-             * read in batch
-             */
-            currentRunFrames[runIndex] = 0;
-            for (int j = 0; j < runFrameLimit; j++) {
-                int frameIndex = currentFrameIndexInRun[runIndex] + j;
-                if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
-                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex).getBuffer());
-                    existNext = true;
-                    currentRunFrames[runIndex]++;
-                } else {
-                    break;
-                }
-            }
-        }
-
-        if (existNext) {
-            topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
-        } else {
-            topTuples.pop();
-            closeRun(runIndex, runCursors, tupleAccessors);
-        }
-    }
-
-    /**
-     * Close the run file, and also the corresponding readers and
-     * input frame.
-     *
-     * @param index
-     * @param runCursors
-     * @param tupleAccessor
-     * @throws HyracksDataException
-     */
-    private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
-            throws HyracksDataException {
-        if (runCursors[index] != null) {
-            runCursors[index].close();
-            runCursors[index] = null;
-            int frameOffset = index * runFrameLimit;
-            for (int j = 0; j < runFrameLimit; j++) {
-                tupleAccessor[frameOffset + j] = null;
-            }
-        }
-    }
-
-    private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2)
-            throws HyracksDataException {
-        byte[] b1 = fta1.getBuffer().array();
-        byte[] b2 = fta2.getBuffer().array();
-        for (int f = 0; f < keyFields.length; ++f) {
-            int fIdx = f;
-            int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
-            int l1 = fta1.getFieldLength(j1, fIdx);
-            int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
-            int l2_start = fta2.getFieldStartOffset(j2, fIdx);
-            int l2_end = fta2.getFieldEndOffset(j2, fIdx);
-            int l2 = l2_end - l2_start;
-            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators)
-            throws HyracksDataException {
-        return new Comparator<ReferenceEntry>() {
-
-            @Override
-            public int compare(ReferenceEntry o1, ReferenceEntry o2) {
-                FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
-                FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
-                int j1 = o1.getTupleIndex();
-                int j2 = o2.getTupleIndex();
-                byte[] b1 = fta1.getBuffer().array();
-                byte[] b2 = fta2.getBuffer().array();
-                for (int f = 0; f < keyFields.length; ++f) {
-                    int fIdx = f;
-                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                            + fta1.getFieldStartOffset(j1, fIdx);
-                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                            + fta2.getFieldStartOffset(j2, fIdx);
-                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                    int c;
-                    try {
-                        c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    } catch (HyracksDataException e) {
-                        throw new IllegalArgumentException(e);
-                    }
-
-                }
-                return 0;
-            }
-
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index a0fd5f8..433b75d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -48,17 +48,21 @@ public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final INormalizedKeyComputerFactory firstNormalizerFactory;
 
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-    private final IAggregatorDescriptorFactory mergerFactory;
+    private final IAggregatorDescriptorFactory partialAggregatorFactory;
+    private final IAggregatorDescriptorFactory intermediateAggregateFactory;
 
     private final int framesLimit;
     private final ISpillableTableFactory spillableTableFactory;
-    private final boolean isOutputSorted;
-
-    public ExternalGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keyFields, int framesLimit,
-            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
-            RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+    private final RecordDescriptor partialRecDesc;
+    private final RecordDescriptor outRecDesc;
+    private final int tableSize;
+    private final long fileSize;
+
+    public ExternalGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputSizeInTuple, long inputFileSize,
+            int[] keyFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory firstNormalizerFactory, IAggregatorDescriptorFactory partialAggregatorFactory,
+            IAggregatorDescriptorFactory intermediateAggregateFactory, RecordDescriptor partialAggRecordDesc,
+            RecordDescriptor outRecordDesc, ISpillableTableFactory spillableTableFactory) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         if (framesLimit <= 1) {
@@ -68,19 +72,23 @@ public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor
              */
             throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
         }
-        this.aggregatorFactory = aggregatorFactory;
-        this.mergerFactory = mergerFactory;
+        this.partialAggregatorFactory = partialAggregatorFactory;
+        this.intermediateAggregateFactory = intermediateAggregateFactory;
         this.keyFields = keyFields;
         this.comparatorFactories = comparatorFactories;
         this.firstNormalizerFactory = firstNormalizerFactory;
         this.spillableTableFactory = spillableTableFactory;
-        this.isOutputSorted = isOutputSorted;
+
+        this.partialRecDesc = partialAggRecordDesc;
+        this.outRecDesc = outRecordDesc;
 
         /**
          * Set the record descriptor. Note that since this operator is a unary
          * operator, only the first record descriptor is used here.
          */
-        recordDescriptors[0] = recordDescriptor;
+        recordDescriptors[0] = outRecordDesc;
+        this.tableSize = inputSizeInTuple;
+        this.fileSize = inputFileSize;
     }
 
     /*
@@ -114,11 +122,11 @@ public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                throws HyracksDataException {
-            return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keyFields,
-                    framesLimit, comparatorFactories, firstNormalizerFactory, aggregatorFactory,
-                    recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0],
-                    spillableTableFactory);
+                        throws HyracksDataException {
+            return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), tableSize,
+                    fileSize, keyFields, framesLimit, comparatorFactories, firstNormalizerFactory,
+                    partialAggregatorFactory, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+                    recordDescriptors[0], spillableTableFactory);
         }
     }
 
@@ -132,10 +140,12 @@ public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                throws HyracksDataException {
-            return new ExternalGroupMergeOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
-                    AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, firstNormalizerFactory, keyFields,
-                    mergerFactory, isOutputSorted, framesLimit, recordDescriptors[0]);
+                        throws HyracksDataException {
+            return new ExternalGroupWriteOperatorNodePushable(ctx,
+                    new TaskId(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID), partition),
+                    spillableTableFactory, partialRecDesc, outRecDesc, framesLimit, keyFields, firstNormalizerFactory,
+                    comparatorFactories, intermediateAggregateFactory);
+
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupState.java
index ebc3516..a36e38d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupState.java
@@ -21,30 +21,27 @@ package org.apache.hyracks.dataflow.std.group.external;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.LinkedList;
 
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.group.ISpillableTable;
 
 public class ExternalGroupState extends AbstractStateObject {
-    private LinkedList<RunFileReader> runs;
 
+    private RunFileWriter[] runs;
     private ISpillableTable gTable;
-
-    public ExternalGroupState() {
-    }
+    private int[] spilledNumTuples;
 
     ExternalGroupState(JobId jobId, Object id) {
         super(jobId, id);
     }
 
-    public LinkedList<RunFileReader> getRuns() {
+    public RunFileWriter[] getRuns() {
         return runs;
     }
 
-    public void setRuns(LinkedList<RunFileReader> runs) {
+    public void setRuns(RunFileWriter[] runs) {
         this.runs = runs;
     }
 
@@ -65,4 +62,12 @@ public class ExternalGroupState extends AbstractStateObject {
     public void fromBytes(DataInput in) throws IOException {
         throw new UnsupportedOperationException();
     }
+
+    public void setSpilledNumTuples(int[] spilledNumTuples) {
+        this.spilledNumTuples = spilledNumTuples;
+    }
+
+    public int[] getSpilledNumTuples() {
+        return spilledNumTuples;
+    }
 }


Mime
View raw message