asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wangs...@apache.org
Subject [1/3] asterixdb git commit: ASTERIXDB-1556, ASTERIXDB-1733: Hash Group By and Hash Join conform to the memory budget
Date Wed, 04 Jan 2017 23:46:29 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 1355c269f -> 8b2aceeb9


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index 9584f26..ca97be3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -18,307 +18,552 @@
  */
 package org.apache.hyracks.dataflow.std.structures;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 
 /**
- * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex;
- * fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer
+ * This is an extension of SimpleSerializableHashTable class.
+ * A buffer manager needs to be assigned to allocate/release frames for this table so that
+ * the maximum memory usage can be bounded under the certain limit.
  */
-public class SerializableHashTable implements ISerializableTable {
-
-    private static final int INT_SIZE = 4;
-    private static final int INIT_ENTRY_SIZE = 4;
-
-    private IntSerDeBuffer[] headers;
-    private List<IntSerDeBuffer> contents = new ArrayList<>();
-    private List<Integer> frameCurrentIndex = new ArrayList<>();
-    private final IHyracksFrameMgrContext ctx;
-    private final int frameCapacity;
-    private int currentLargestFrameIndex = 0;
-    private int tupleCount = 0;
-    private int headerFrameCount = 0;
-    private TuplePointer tempTuplePointer = new TuplePointer();
-
-    public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException {
-        this.ctx = ctx;
-        int frameSize = ctx.getInitialFrameSize();
-
-        int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
-        int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;
-        headers = new IntSerDeBuffer[headerSize];
-
-        IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array());
-        contents.add(frame);
-        frameCurrentIndex.add(0);
-        frameCapacity = frame.capacity();
-    }
+public class SerializableHashTable extends SimpleSerializableHashTable {
 
-    @Override
-    public void insert(int entry, TuplePointer pointer) throws HyracksDataException {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header == null) {
-            header = new IntSerDeBuffer(ctx.allocateFrame().array());
-            headers[hFrameIndex] = header;
-            resetFrame(header);
-            headerFrameCount++;
-        }
-        int frameIndex = header.getInt(headerOffset);
-        int offsetIndex = header.getInt(headerOffset + 1);
-        if (frameIndex < 0) {
-            // insert first tuple into the entry
-            insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
-        } else {
-            // insert non-first tuple into the entry
-            insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer);
-        }
-        tupleCount++;
-    }
+    protected double garbageCollectionThreshold;
+    protected int wastedIntSpaceCount = 0;
+    protected ISimpleFrameBufferManager bufferManager;
 
-    @Override
-    public void delete(int entry) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header != null) {
-            int frameIndex = header.getInt(headerOffset);
-            int offsetIndex = header.getInt(headerOffset + 1);
-            if (frameIndex >= 0) {
-                IntSerDeBuffer frame = contents.get(frameIndex);
-                int entryUsedItems = frame.getInt(offsetIndex + 1);
-                frame.writeInt(offsetIndex + 1, 0);
-                tupleCount -= entryUsedItems;
-            }
-        }
+    public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx,
+            ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
+        this(tableSize, ctx, bufferManager, 0.1);
     }
 
-    @Override
-    public boolean getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header == null) {
-            dataPointer.reset(-1, -1);
-            return false;
-        }
-        int frameIndex = header.getInt(headerOffset);
-        int offsetIndex = header.getInt(headerOffset + 1);
-        if (frameIndex < 0) {
-            dataPointer.reset(-1, -1);
-            return false;
-        }
-        IntSerDeBuffer frame = contents.get(frameIndex);
-        int entryUsedItems = frame.getInt(offsetIndex + 1);
-        if (offset > entryUsedItems - 1) {
-            dataPointer.reset(-1, -1);
-            return false;
-        }
-        int startIndex = offsetIndex + 2 + offset * 2;
-        while (startIndex >= frameCapacity) {
-            ++frameIndex;
-            startIndex -= frameCapacity;
-        }
-        frame = contents.get(frameIndex);
-        dataPointer.reset(frame.getInt(startIndex), frame.getInt(startIndex + 1));
-        return true;
-    }
-
-    @Override
-    public void reset() {
-        for (IntSerDeBuffer frame : headers)
-            if (frame != null)
-                resetFrame(frame);
+    public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx,
+            ISimpleFrameBufferManager bufferManager, double garbageCollectionThreshold)
+            throws HyracksDataException {
+        super(tableSize, ctx, false);
+        this.bufferManager = bufferManager;
 
-        frameCurrentIndex.clear();
-        for (int i = 0; i < contents.size(); i++) {
-            frameCurrentIndex.add(0);
+        ByteBuffer newFrame = getFrame(frameSize);
+        if (newFrame == null) {
+            throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
         }
-
-        currentLargestFrameIndex = 0;
-        tupleCount = 0;
+        IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
+        frameCapacity = frame.capacity();
+        contents.add(frame);
+        currentOffsetInEachFrameList.add(0);
+        this.garbageCollectionThreshold = garbageCollectionThreshold;
     }
 
     @Override
-    public int getFrameCount() {
-        return headerFrameCount + contents.size();
+    ByteBuffer getFrame(int size) throws HyracksDataException {
+        ByteBuffer newFrame = bufferManager.acquireFrame(size);
+        if (newFrame != null) {
+            currentByteSize += size;
+        }
+        return newFrame;
     }
 
     @Override
-    public int getTupleCount() {
-        return tupleCount;
+    void increaseWastedSpaceCount(int size) {
+        wastedIntSpaceCount += size;
     }
 
     @Override
-    public int getTupleCount(int entry) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header != null) {
-            int frameIndex = header.getInt(headerOffset);
-            int offsetIndex = header.getInt(headerOffset + 1);
-            if (frameIndex >= 0) {
-                IntSerDeBuffer frame = contents.get(frameIndex);
-                int entryUsedItems = frame.getInt(offsetIndex + 1);
-                return entryUsedItems;
-            }
-        }
-        return 0;
+    public void reset() {
+        super.reset();
+        currentByteSize = 0;
     }
 
     @Override
     public void close() {
-        int nFrames = contents.size();
-        int hFrames = 0;
         for (int i = 0; i < headers.length; i++) {
             if (headers[i] != null) {
-                hFrames++;
+                bufferManager.releaseFrame(headers[i].getByteBuffer());
                 headers[i] = null;
             }
         }
+        for (int i = 0; i < contents.size(); i++) {
+            bufferManager.releaseFrame(contents.get(i).getByteBuffer());
+        }
         contents.clear();
-        frameCurrentIndex.clear();
+        currentOffsetInEachFrameList.clear();
         tupleCount = 0;
-        currentLargestFrameIndex = 0;
-        ctx.deallocateFrames((nFrames + hFrames) * frameCapacity * 4);
+        currentByteSize = 0;
+        currentLargestFrameNumber = 0;
+    }
+
+    @Override
+    public boolean isGarbageCollectionNeeded() {
+        return wastedIntSpaceCount > frameCapacity * (currentLargestFrameNumber + 1) * garbageCollectionThreshold;
     }
 
-    private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)
+    /**
+     * Collects garbages. The steps are as follows.
+     * #1. Initialize the Reader and Writer. The starting frame index is set to zero at this moment.
+     * #2. Read a content frame. Find and read a slot data. Check the number of used count for the slot.
+     * If it's not -1 (meaning that it is being used now), we move it to to the
+     * current writing offset of the Writer frame. Update the corresponding h() value pointer for this location
+     * in the header frame. We can find the h() value of the slot using a first tuple pointer in the slot.
+     * If the number is -1 (meaning that it is migrated to a new place due to an overflow or deleted),
+     * just reclaim the space by letting other slot move to this space.
+     * #3. Once a Reader reaches the end of a frame, read next frame by frame. This applies to the Writer, too. i.e.
+     * If the writing offset pointer reaches at the end of a frame, then writing frame will be set to the next frame.
+     * #4. Repeat #1 ~ #3 until all frames are read.
+     *
+     * @return the number of frames that are reclaimed. The value -1 is returned when no compaction was happened.
+     */
+    @Override
+    public int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc)
             throws HyracksDataException {
-        IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
-        int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
-        int requiredIntCapacity = entryCapacity * 2;
-        int startFrameIndex = currentLargestFrameIndex;
-
-        if (lastIndex + requiredIntCapacity >= frameCapacity) {
-            IntSerDeBuffer newFrame;
-            startFrameIndex++;
-            do {
-                if (currentLargestFrameIndex >= contents.size() - 1) {
-                    newFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
-                    currentLargestFrameIndex++;
-                    contents.add(newFrame);
-                    frameCurrentIndex.add(0);
+        // Keeps the garbage collection related variable
+        GarbageCollectionInfo gcInfo = new GarbageCollectionInfo();
+
+        int slotCapacity;
+        int slotUsedCount;
+        int capacityInIntCount;
+        int nextSlotIntPosInPageForGC;
+        boolean currentPageChanged;
+        IntSerDeBuffer currentReadContentFrameForGC;
+        IntSerDeBuffer currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+        int lastOffsetInLastFrame = currentOffsetInEachFrameList.get(contents.size() - 1);
+
+        // Step #1. Reads a content frame until it reaches the end of content frames.
+        while (gcInfo.currentReadPageForGC <= currentLargestFrameNumber) {
+
+            gcInfo.currentReadIntOffsetInPageForGC = 0;
+            currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+
+            // Step #2. Advances the reader until it hits the end of the given frame.
+            while (gcInfo.currentReadIntOffsetInPageForGC < frameCapacity) {
+                nextSlotIntPosInPageForGC = findNextSlotInPage(currentReadContentFrameForGC,
+                        gcInfo.currentReadIntOffsetInPageForGC);
+
+                if (nextSlotIntPosInPageForGC == INVALID_VALUE) {
+                    // There isn't a valid slot in the page. Exits the loop #2 and reads the next frame.
+                    break;
+                }
+
+                // Valid slot found. Reads the given slot information.
+                slotCapacity = currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC);
+                slotUsedCount = currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 1);
+                capacityInIntCount = (slotCapacity + 1) * 2;
+
+                // Used count should not be -1 (migrated or deleted).
+                if (slotUsedCount != INVALID_VALUE) {
+                    // To prepare hash pointer (header -> content) update, read the first tuple pointer in the old slot.
+                    tempTuplePointer.reset(currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 2),
+                            currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 3));
+
+                    // Check whether there is at least some space to put some part of the slot.
+                    // If not, advance the write pointer to the next page.
+                    if ((gcInfo.currentWriteIntOffsetInPageForGC + 4) > frameCapacity
+                            && gcInfo.currentGCWritePageForGC < currentLargestFrameNumber) {
+                        // Swipe the region that can't be used.
+                        currentWriteContentFrameForGC.writeInvalidVal(gcInfo.currentWriteIntOffsetInPageForGC,
+                                frameCapacity - gcInfo.currentWriteIntOffsetInPageForGC);
+                        gcInfo.currentGCWritePageForGC++;
+                        currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+                        gcInfo.currentWriteIntOffsetInPageForGC = 0;
+                    }
+
+                    // Migrates this slot to the current offset in Writer's Frame if possible.
+                    currentPageChanged = MigrateSlot(gcInfo, bufferAccessor, tpc, capacityInIntCount,
+                            nextSlotIntPosInPageForGC);
+
+                    if (currentPageChanged) {
+                        currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                        currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+                    }
                 } else {
-                    currentLargestFrameIndex++;
-                    frameCurrentIndex.set(currentLargestFrameIndex, 0);
+                    // A useless slot (either migrated or deleted) is found. Resets the space
+                    // so it will be occupied by the next valid slot.
+                    currentPageChanged = resetSlotSpace(gcInfo, nextSlotIntPosInPageForGC, capacityInIntCount);
+
+                    if (currentPageChanged) {
+                        currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                    }
+
                 }
-                requiredIntCapacity -= frameCapacity;
-            } while (requiredIntCapacity > 0);
-            lastIndex = 0;
-            lastFrame = contents.get(startFrameIndex);
+            }
+
+            // Reached the end of a frame. Advances the Reader.
+            if (gcInfo.currentReadPageForGC == currentLargestFrameNumber) {
+                break;
+            }
+            gcInfo.currentReadPageForGC++;
         }
 
-        // set header
-        header.writeInt(headerOffset, startFrameIndex);
-        header.writeInt(headerOffset + 1, lastIndex);
-
-        // set the entry
-        lastFrame.writeInt(lastIndex, entryCapacity - 1);
-        lastFrame.writeInt(lastIndex + 1, 1);
-        lastFrame.writeInt(lastIndex + 2, pointer.getFrameIndex());
-        lastFrame.writeInt(lastIndex + 3, pointer.getTupleIndex());
-        int newLastIndex = lastIndex + entryCapacity * 2;
-        newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1;
-        frameCurrentIndex.set(startFrameIndex, newLastIndex);
-
-        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
-        while (requiredIntCapacity > 0) {
-            startFrameIndex++;
-            requiredIntCapacity -= frameCapacity;
-            newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1;
-            frameCurrentIndex.set(startFrameIndex, newLastIndex);
+        // More unused frames at the end?
+        int extraFrames = 0;
+        if (contents.size() > (currentLargestFrameNumber + 1)) {
+            extraFrames = contents.size() - (currentLargestFrameNumber + 1);
         }
-    }
 
-    private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndexArg, int offsetIndex,
-            TuplePointer pointer) throws HyracksDataException {
-        int frameIndex = frameIndexArg;
-        IntSerDeBuffer frame = contents.get(frameIndex);
-        int entryItems = frame.getInt(offsetIndex);
-        int entryUsedItems = frame.getInt(offsetIndex + 1);
-
-        if (entryUsedItems < entryItems) {
-            frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
-            int startIndex = offsetIndex + 2 + entryUsedItems * 2;
-            while (startIndex >= frameCapacity) {
-                ++frameIndex;
-                startIndex -= frameCapacity;
+        // Done reading all frames. So, releases unnecessary frames.
+        int numberOfFramesToBeDeallocated = gcInfo.currentReadPageForGC + extraFrames - gcInfo.currentGCWritePageForGC;
+
+        if (numberOfFramesToBeDeallocated >= 1) {
+            for (int i = 0; i < numberOfFramesToBeDeallocated; i++) {
+                currentByteSize -= contents.get(gcInfo.currentGCWritePageForGC + 1).getByteCapacity();
+                bufferManager.releaseFrame(contents.get(gcInfo.currentGCWritePageForGC + 1).getByteBuffer());
+                contents.remove(gcInfo.currentGCWritePageForGC + 1);
+                currentOffsetInEachFrameList.remove(gcInfo.currentGCWritePageForGC + 1);
             }
-            frame = contents.get(frameIndex);
-            frame.writeInt(startIndex, pointer.getFrameIndex());
-            frame.writeInt(startIndex + 1, pointer.getTupleIndex());
         } else {
-            int capacity = (entryItems + 1) * 2;
-            header.writeInt(headerOffset, -1);
-            header.writeInt(headerOffset + 1, -1);
-            int fIndex = frame.getInt(offsetIndex + 2);
-            int tIndex = frame.getInt(offsetIndex + 3);
-            tempTuplePointer.reset(fIndex, tIndex);
-            this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer);
-            int newFrameIndex = header.getInt(headerOffset);
-            int newTupleIndex = header.getInt(headerOffset + 1);
-
-            for (int i = 1; i < entryUsedItems; i++) {
-                int startIndex = offsetIndex + 2 + i * 2;
-                int startFrameIndex = frameIndex;
-                while (startIndex >= frameCapacity) {
-                    ++startFrameIndex;
-                    startIndex -= frameCapacity;
+            // For this case, we check whether the last offset is changed.
+            // If not, we didn't get any space from the operation.
+            int afterLastOffsetInLastFrame = currentOffsetInEachFrameList.get(gcInfo.currentGCWritePageForGC);
+            if (lastOffsetInLastFrame == afterLastOffsetInLastFrame) {
+                numberOfFramesToBeDeallocated = -1;
+            }
+        }
+
+        // Resets the current offset in the last frame so that the future insertions will work without an issue.
+        currentLargestFrameNumber = gcInfo.currentGCWritePageForGC;
+        currentOffsetInEachFrameList.set(gcInfo.currentGCWritePageForGC, gcInfo.currentWriteIntOffsetInPageForGC);
+
+        wastedIntSpaceCount = 0;
+        tempTuplePointer.reset(INVALID_VALUE, INVALID_VALUE);
+
+        return numberOfFramesToBeDeallocated;
+    }
+
+    /**
+     * Migrates the current slot to the designated place and reset the current space using INVALID_VALUE.
+     *
+     * @return true if the current page has been changed. false if not.
+     */
+    private boolean MigrateSlot(GarbageCollectionInfo gcInfo, ITuplePointerAccessor bufferAccessor,
+            ITuplePartitionComputer tpc, int capacityInIntCount, int nextSlotIntPosInPageForGC)
+            throws HyracksDataException {
+        boolean currentPageChanged = false;
+        // If the reader and writer indicate the same slot location, a move is not required.
+        if (gcInfo.isReaderWriterAtTheSamePos()) {
+            int intToRead = capacityInIntCount;
+            int intReadAtThisTime;
+            gcInfo.currentReadIntOffsetInPageForGC = nextSlotIntPosInPageForGC;
+            while (intToRead > 0) {
+                intReadAtThisTime = Math.min(intToRead, frameCapacity - gcInfo.currentReadIntOffsetInPageForGC);
+                gcInfo.currentReadIntOffsetInPageForGC += intReadAtThisTime;
+                if (gcInfo.currentReadIntOffsetInPageForGC >= frameCapacity
+                        && gcInfo.currentReadPageForGC < currentLargestFrameNumber) {
+                    gcInfo.currentReadPageForGC++;
+                    gcInfo.currentReadIntOffsetInPageForGC = 0;
+                    currentPageChanged = true;
                 }
-                frame = contents.get(startFrameIndex);
-                fIndex = frame.getInt(startIndex);
-                tIndex = frame.getInt(startIndex + 1);
-                tempTuplePointer.reset(fIndex, tIndex);
-                insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer);
+                intToRead -= intReadAtThisTime;
             }
-            insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer);
+
+            gcInfo.currentGCWritePageForGC = gcInfo.currentReadPageForGC;
+            gcInfo.currentWriteIntOffsetInPageForGC = gcInfo.currentReadIntOffsetInPageForGC;
+
+            return currentPageChanged;
         }
+
+        // The reader is ahead of the writer. We can migrate the given slot towards to the beginning of
+        // the content frame(s).
+        int tempWriteIntPosInPage = gcInfo.currentWriteIntOffsetInPageForGC;
+        int tempReadIntPosInPage = nextSlotIntPosInPageForGC;
+        int chunksToMove = capacityInIntCount;
+        int chunksToMoveAtThisTime;
+
+        // To keep the original writing page that is going to be used for updating the header to content frame,
+        // we declare a local variable.
+        int tempWritePage = gcInfo.currentGCWritePageForGC;
+
+        // Keeps the maximum INT chunks that writer/reader can write in the current page.
+        int oneTimeIntCapacityForWriter;
+        int oneTimeIntCapacityForReader;
+
+        IntSerDeBuffer currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+        IntSerDeBuffer currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+
+        // Moves the slot.
+        while (chunksToMove > 0) {
+            oneTimeIntCapacityForWriter = Math.min(chunksToMove, frameCapacity - tempWriteIntPosInPage);
+            oneTimeIntCapacityForReader = Math.min(chunksToMove, frameCapacity - tempReadIntPosInPage);
+
+            // Since the location of Reader and Writer are different, we can only move a minimum chunk
+            // before the current page of either Reader or Writer changes.
+            chunksToMoveAtThisTime = Math.min(oneTimeIntCapacityForWriter, oneTimeIntCapacityForReader);
+
+            // Moves a part of the slot from the Reader to Writer
+            System.arraycopy(currentReadContentFrameForGC.bytes, tempReadIntPosInPage * INT_SIZE,
+                    currentWriteContentFrameForGC.bytes, tempWriteIntPosInPage * INT_SIZE,
+                    chunksToMoveAtThisTime * INT_SIZE);
+
+            // Clears that part in the Reader
+            for (int i = 0; i < chunksToMoveAtThisTime; i++) {
+                // Do not blindly put -1 since there might be overlapping between writer and reader.
+                if ((gcInfo.currentReadPageForGC != tempWritePage)
+                        || (tempReadIntPosInPage + i >= tempWriteIntPosInPage + chunksToMoveAtThisTime)) {
+                    currentReadContentFrameForGC.writeInvalidVal(tempReadIntPosInPage + i, chunksToMoveAtThisTime - i);
+                    break;
+                }
+            }
+
+            // Advances the pointer
+            tempWriteIntPosInPage += chunksToMoveAtThisTime;
+            tempReadIntPosInPage += chunksToMoveAtThisTime;
+
+            // Once the writer pointer hits the end of the page, we move to the next content page.
+            if (tempWriteIntPosInPage >= frameCapacity && tempWritePage < currentLargestFrameNumber) {
+                tempWritePage++;
+                currentPageChanged = true;
+                currentWriteContentFrameForGC = contents.get(tempWritePage);
+                tempWriteIntPosInPage = 0;
+            }
+
+            // Once the reader pointer hits the end of the page, we move to the next content page.
+            if (tempReadIntPosInPage >= frameCapacity && gcInfo.currentReadPageForGC < currentLargestFrameNumber) {
+                gcInfo.currentReadPageForGC++;
+                currentPageChanged = true;
+                currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                tempReadIntPosInPage = 0;
+            }
+
+            chunksToMove -= chunksToMoveAtThisTime;
+        }
+
+        updateHeaderToContentPointerInHeaderFrame(bufferAccessor, tpc, tempTuplePointer, gcInfo.currentGCWritePageForGC,
+                gcInfo.currentWriteIntOffsetInPageForGC);
+
+        gcInfo.currentGCWritePageForGC = tempWritePage;
+        gcInfo.currentWriteIntOffsetInPageForGC = tempWriteIntPosInPage;
+        gcInfo.currentReadIntOffsetInPageForGC = tempReadIntPosInPage;
+
+        return currentPageChanged;
     }
 
-    private void resetFrame(IntSerDeBuffer frame) {
-        for (int i = 0; i < frameCapacity; i++)
-            frame.writeInt(i, -1);
+    /**
+     * Completely removes the slot in the given content frame(s) and resets the space.
+     * For this method, we assume that this slot is not moved to somewhere else.
+     *
+     * @return true if the current page has been changed. false if not.
+     */
+    private boolean resetSlotSpace(GarbageCollectionInfo gcInfo, int slotIntPos, int capacityInIntCount) {
+        boolean currentPageChanged = false;
+        int tempReadIntPosInPage = slotIntPos;
+        int chunksToDelete = capacityInIntCount;
+        int chunksToDeleteAtThisTime;
+        IntSerDeBuffer currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+
+        while (chunksToDelete > 0) {
+            chunksToDeleteAtThisTime = Math.min(chunksToDelete, frameCapacity - tempReadIntPosInPage);
+
+            // Clears that part in the Reader
+            currentReadContentFrameForGC.writeInvalidVal(tempReadIntPosInPage, chunksToDeleteAtThisTime);
+
+            // Advances the pointer
+            tempReadIntPosInPage += chunksToDeleteAtThisTime;
+
+            // Once the reader pointer hits the end of the page, we move to the next content page.
+            if (tempReadIntPosInPage >= frameCapacity && gcInfo.currentReadPageForGC < currentLargestFrameNumber) {
+                gcInfo.currentReadPageForGC++;
+                currentPageChanged = true;
+                currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                tempReadIntPosInPage = 0;
+            }
+
+            chunksToDelete -= chunksToDeleteAtThisTime;
+        }
+
+        gcInfo.currentReadIntOffsetInPageForGC = tempReadIntPosInPage;
+
+        return currentPageChanged;
     }
 
-    private int getHeaderFrameIndex(int entry) {
-        int frameIndex = entry * 2 / frameCapacity;
-        return frameIndex;
+    /**
+     * Updates the given Header to Content Frame Pointer after calculating the corresponding hash value from the
+     * given tuple pointer.
+     */
+    private void updateHeaderToContentPointerInHeaderFrame(ITuplePointerAccessor bufferAccessor,
+            ITuplePartitionComputer tpc, TuplePointer hashedTuple, int newContentFrame,
+            int newOffsetInContentFrame) throws HyracksDataException {
+        // Finds the original hash value. We assume that bufferAccessor and tpc is already assigned.
+        bufferAccessor.reset(hashedTuple);
+        int entry = tpc.partition(bufferAccessor, hashedTuple.getTupleIndex(), tableSize);
+
+        // Finds the location of the hash value in the header frame arrays.
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+
+        // Updates the hash value.
+        headerFrame.writeInt(offsetInHeaderFrame, newContentFrame);
+        headerFrame.writeInt(offsetInHeaderFrame + 1, newOffsetInContentFrame);
     }
 
-    private int getHeaderFrameOffset(int entry) {
-        int offset = entry * 2 % frameCapacity;
-        return offset;
+
+    /**
+     * Tries to find the next valid slot position in the given content frame from the current position.
+     */
+    private int findNextSlotInPage(IntSerDeBuffer frame, int readIntPosAtPage) {
+        // Sanity check
+        if (readIntPosAtPage >= frameCapacity) {
+            return INVALID_VALUE;
+        }
+        int intOffset = readIntPosAtPage;
+        while (frame.getInt(intOffset) == INVALID_VALUE) {
+            intOffset++;
+            if (intOffset >= frameCapacity) {
+                // Couldn't find the next slot in the given page.
+                return INVALID_VALUE;
+            }
+        }
+        return intOffset;
     }
 
-    private static class IntSerDeBuffer {
+    /**
+     * Keeps the garbage collection related variables
+     */
+    private static class GarbageCollectionInfo {
+        int currentReadPageForGC;
+        int currentReadIntOffsetInPageForGC;
+        int currentGCWritePageForGC;
+        int currentWriteIntOffsetInPageForGC;
+
+        public GarbageCollectionInfo() {
+            currentReadPageForGC = 0;
+            currentReadIntOffsetInPageForGC = 0;
+            currentGCWritePageForGC = 0;
+            currentWriteIntOffsetInPageForGC = 0;
+        }
+
+        /**
+         * Checks whether the writing position and the reading position are the same.
+         */
+        public boolean isReaderWriterAtTheSamePos() {
+            return currentReadPageForGC == currentGCWritePageForGC
+                    && currentReadIntOffsetInPageForGC == currentWriteIntOffsetInPageForGC;
+        }
+    }
 
-        private byte[] bytes;
+    /**
+     * Returns the current status of this table: the number of slots, frames, space utilization, etc.
+     */
+    @Override
+    public String printInfo() {
+        SlotInfoPair<Integer, Integer> slotInfo = new SlotInfoPair<>(0, 0);
 
-        public IntSerDeBuffer(byte[] data) {
-            this.bytes = data;
+        int nFrames = contents.size();
+        int hFrames = 0;
+        // Histogram Information - counts the number of used count per slot used count (e.g., 10,2 means that
+        // there are 10 hash slots that only has two hash entries in it.)
+        Map<Integer, Integer> headerSlotUsedCountMap = new TreeMap<>();
+
+        // Histogram Information - counts the number of capacity count per slot count (10,3 means that
+        // there are 10 hash slots whose capacity is 3.)
+        Map<Integer, Integer> headerSlotCapaCountMap = new TreeMap<>();
+
+        int headerSlotUsedCount = 0;
+        int headerSlotTotalCount;
+        double headerSlotUsedRatio = 0.0;
+        IntSerDeBuffer header;
+        int tupleUsedCount;
+        int tupleUsedCountFromMap;
+        int capacity;
+        int capacityFromMap;
+        for (int i = 0; i < headers.length; i++) {
+            if (headers[i] != null) {
+                header = headers[i];
+                for (int j = 0; j < frameCapacity; j = j + 2) {
+                    if (header.getInt(j) >= 0) {
+                        headerSlotUsedCount++;
+                        getSlotInfo(header.getInt(j), header.getInt(j + 1), slotInfo);
+                        capacity = slotInfo.first;
+                        tupleUsedCount = slotInfo.second;
+                        // UsedCount increase
+                        if (headerSlotUsedCountMap.containsKey(tupleUsedCount)) {
+                            tupleUsedCountFromMap = headerSlotUsedCountMap.get(tupleUsedCount);
+                            headerSlotUsedCountMap.put(tupleUsedCount, tupleUsedCountFromMap + 1);
+                        } else {
+                            headerSlotUsedCountMap.put(tupleUsedCount, 1);
+                        }
+                        // Capacity increase
+                        if (headerSlotCapaCountMap.containsKey(capacity)) {
+                            capacityFromMap = headerSlotCapaCountMap.get(capacity);
+                            headerSlotCapaCountMap.put(capacity, capacityFromMap + 1);
+                        } else {
+                            headerSlotCapaCountMap.put(capacity, 1);
+                        }
+                        headerSlotUsedCount++;
+                    }
+                }
+                hFrames++;
+            }
+        }
+        headerSlotTotalCount = hFrames * frameCapacity / 2;
+        if (headerSlotTotalCount > 0) {
+            headerSlotUsedRatio = (double) headerSlotUsedCount / (double) headerSlotTotalCount;
         }
+        int total = hFrames + nFrames;
+        StringBuilder buf = new StringBuilder();
+        buf.append("\n>>> " + this + " " + Thread.currentThread().getId() + "::printInfo()" + "\n");
+        buf.append("(A) hash table cardinality (# of slot):\t" + tableSize + "\tExpected Table Size(MB):\t"
+                + ((double) getExpectedTableByteSize(tableSize, frameCapacity * 4) / 1048576) + "\twasted size(MB):\t"
+                + ((double) wastedIntSpaceCount * 4 / 1048576) + "\n");
+        buf.append("(B) # of header frames:\t" + hFrames + "\tsize(MB)\t"
+                + ((double) hFrames * frameCapacity * 4 / 1048576) + "\tratio (B/D)\t" + ((double) hFrames / total)
+                + "\n");
+        buf.append("(C) # of content frames:\t" + nFrames + "\tsize(MB)\t"
+                + ((double) nFrames * frameCapacity * 4 / 1048576) + "\tratio (C/D)\t" + ((double) nFrames / total)
+                + "\n");
+        buf.append("(D) # of total frames:\t" + total + "\tsize(MB)\t" + ((double) total * frameCapacity * 4 / 1048576)
+                + "\n");
+        buf.append("(E) # of used header entries:\t" + headerSlotUsedCount + "\n");
+        buf.append("(F) # of all possible header entries:\t" + headerSlotTotalCount + "\n");
+        buf.append("(G) header entries used ratio (E/F):\t" + headerSlotUsedRatio + "\n");
+        buf.append("(H) used count histogram (used count, its frequency):" + "\n");
+        int totalContentUsedCount = 0;
+        for (Map.Entry<Integer, Integer> entry : headerSlotUsedCountMap.entrySet()) {
+            buf.append(entry.getKey() + "\t" + entry.getValue() + "\n");
+            totalContentUsedCount += (entry.getKey() * entry.getValue());
+        }
+        buf.append("(H-1) total used count in content frames:\t" + totalContentUsedCount + "\n");
 
-        public int getInt(int pos) {
-            int offset = pos * 4;
-            return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
-                    + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
+        int totalContentCapaCount = 0;
+        buf.append("(I) capacity count histogram (capacity, its frequency):" + "\n");
+        for (Map.Entry<Integer, Integer> entry : headerSlotCapaCountMap.entrySet()) {
+            buf.append(entry.getKey() + "\t" + entry.getValue() + "\n");
+            totalContentCapaCount += (entry.getKey() * entry.getValue());
         }
+        buf.append("(I-1) total capacity in content frames:\t" + totalContentCapaCount + "\n");
+        buf.append("(J) ratio of used count in content frames (H-1 / I-1):\t"
+                + ((double) totalContentUsedCount / totalContentCapaCount) + "\n");
+        return buf.toString();
+    }
+
+    /**
+     * Returns the capacity and the usedCount for the given slot in this table.
+     */
+    public void getSlotInfo(int contentFrameIndex, int contentOffsetIndex, SlotInfoPair<Integer, Integer> slotInfo) {
+        IntSerDeBuffer frame = contents.get(contentFrameIndex);
+        int entryCapacity = frame.getInt(contentOffsetIndex);
+        int entryUsedItems = frame.getInt(contentOffsetIndex + 1);
+        slotInfo.reset(entryCapacity, entryUsedItems);
+    }
+
+    private static class SlotInfoPair<T1, T2> {
+
+        private T1 first;
+        private T2 second;
 
-        public void writeInt(int pos, int value) {
-            int offset = pos * 4;
-            bytes[offset++] = (byte) (value >> 24);
-            bytes[offset++] = (byte) (value >> 16);
-            bytes[offset++] = (byte) (value >> 8);
-            bytes[offset++] = (byte) (value);
+        public SlotInfoPair(T1 first, T2 second) {
+            this.first = first;
+            this.second = second;
         }
 
-        public int capacity() {
-            return bytes.length / 4;
+        public void reset(T1 first, T2 second) {
+            this.first = first;
+            this.second = second;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
new file mode 100644
index 0000000..5b7d364
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+
+/**
+ * This table consists of header frames and content frames.
+ * Header indicates the first entry slot location for the given integer value.
+ * A header slot consists of [content frame number], [offset in that frame] to get
+ * the first tuple's pointer information that shares the same hash value.
+ * An entry slot in the content frame is as follows.
+ * [capacity of the slot], [# of occupied elements], {[frameIndex], [tupleIndex]}+;
+ * <fIndex, tIndex> forms a tuple pointer.
+ * WARNING: this hash table can grow up indefinitely and may generate Out Of Memory Exception.
+ * So, do not use this in production and use SerializableHashTable class instead
+ * since that should be managed by a buffer manager.
+ */
+public class SimpleSerializableHashTable implements ISerializableTable {
+
+    // unit size: int
+    protected static final int INT_SIZE = 4;
+    // Initial entry slot size
+    protected static final int INIT_ENTRY_SIZE = 4;
+    protected static final int INVALID_VALUE = 0xFFFFFFFF;
+    protected static final byte INVALID_BYTE_VALUE = (byte) 0xFF;
+
+    // Header frame array
+    protected IntSerDeBuffer[] headers;
+    // Content frame list
+    protected List<IntSerDeBuffer> contents = new ArrayList<>();
+    protected List<Integer> currentOffsetInEachFrameList = new ArrayList<>();
+    protected int frameCapacity;
+    protected int currentLargestFrameNumber = 0;
+    // The byte size of total frames that are allocated to the headers and contents
+    protected int currentByteSize = 0;
+    protected int tupleCount = 0;
+    protected TuplePointer tempTuplePointer = new TuplePointer();
+    protected int tableSize;
+    protected int frameSize;
+    protected IHyracksFrameMgrContext ctx;
+
+    public SimpleSerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException {
+        this(tableSize, ctx, true);
+    }
+
+    public SimpleSerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx, boolean frameInitRequired)
+            throws HyracksDataException {
+        this.ctx = ctx;
+        frameSize = ctx.getInitialFrameSize();
+        int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
+        int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;
+        headers = new IntSerDeBuffer[headerSize];
+        this.tableSize = tableSize;
+        if (frameInitRequired) {
+            ByteBuffer newFrame = getFrame(frameSize);
+            if (newFrame == null) {
+                throw new HyracksDataException("Can't initialize the Hash Table. Please assign more memory.");
+            }
+            IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
+            frameCapacity = frame.capacity();
+            contents.add(frame);
+            currentOffsetInEachFrameList.add(0);
+        }
+    }
+
+    ByteBuffer getFrame(int size) throws HyracksDataException {
+        currentByteSize += size;
+        return ctx.allocateFrame(size);
+    }
+
+    void increaseWastedSpaceCount(int size) {
+        // Do nothing. For this simple implementation, we don't count the wasted space.
+    }
+
+    @Override
+    public boolean insert(int entry, TuplePointer pointer) throws HyracksDataException {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+        if (headerFrame == null) {
+            ByteBuffer newFrame = getFrame(frameSize);
+            if (newFrame == null) {
+                return false;
+            }
+            headerFrame = new IntSerDeBuffer(newFrame);
+            headers[headerFrameIndex] = headerFrame;
+        }
+        int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+        boolean result;
+        if (contentFrameIndex < 0) {
+            // Since the initial value of index and offset is -1, this means that the slot for
+            // this entry is not created yet. So, create the entry slot and insert first tuple into that slot.
+            // OR, the previous slot becomes full and the newly double-sized slot is about to be created.
+            result = insertNewEntry(headerFrame, offsetInHeaderFrame, INIT_ENTRY_SIZE, pointer);
+        } else {
+            // The entry slot already exists. Insert non-first tuple into the entry slot
+            int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1);
+            result = insertNonFirstTuple(headerFrame, offsetInHeaderFrame, contentFrameIndex, offsetInContentFrame,
+                    pointer);
+        }
+
+        if (result) {
+            tupleCount++;
+        }
+
+        return result;
+    }
+
+    @Override
+    /**
+     * Reset the slot information for the entry. The connection (pointer) between header frame and
+     * content frame will be also lost. Specifically, we reset the number of used count in the slot as -1
+     * so that the space could be reclaimed.
+     */
+    public void delete(int entry) {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[headerFrameIndex];
+        if (header != null) {
+            int contentFrameIndex = header.getInt(offsetInHeaderFrame);
+            int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1);
+            if (contentFrameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(contentFrameIndex);
+                int entrySlotCapacity = frame.getInt(offsetInContentFrame);
+                int entryUsedItems = frame.getInt(offsetInContentFrame + 1);
+                // Set used count as -1 in the slot so that the slot space could be reclaimed.
+                frame.writeInvalidVal(offsetInContentFrame + 1, 1);
+                // Also reset the header (frmaeIdx, offset) to content frame pointer.
+                header.writeInvalidVal(offsetInHeaderFrame, 2);
+                tupleCount = tupleCount - entryUsedItems;
+                increaseWastedSpaceCount((entrySlotCapacity + 1) * 2);
+            }
+        }
+    }
+
+    @Override
+    /**
+     * For the given integer value, get the n-th (n = offsetInSlot) tuple pointer in the corresponding slot.
+     */
+    public boolean getTuplePointer(int entry, int offsetInSlot, TuplePointer dataPointer) {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[headerFrameIndex];
+        if (header == null) {
+            dataPointer.reset(INVALID_VALUE, INVALID_VALUE);
+            return false;
+        }
+        int contentFrameIndex = header.getInt(offsetInHeaderFrame);
+        int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1);
+        if (contentFrameIndex < 0) {
+            dataPointer.reset(INVALID_VALUE, INVALID_VALUE);
+            return false;
+        }
+        IntSerDeBuffer frame = contents.get(contentFrameIndex);
+        int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1);
+        if (offsetInSlot > entryUsedCountInSlot - 1) {
+            dataPointer.reset(INVALID_VALUE, INVALID_VALUE);
+            return false;
+        }
+        int startOffsetInContentFrame = offsetInContentFrame + 2 + offsetInSlot * 2;
+        while (startOffsetInContentFrame >= frameCapacity) {
+            ++contentFrameIndex;
+            startOffsetInContentFrame -= frameCapacity;
+        }
+        frame = contents.get(contentFrameIndex);
+        dataPointer.reset(frame.getInt(startOffsetInContentFrame), frame.getInt(startOffsetInContentFrame + 1));
+        return true;
+    }
+
+    @Override
+    public void reset() {
+        for (IntSerDeBuffer frame : headers) {
+            if (frame != null) {
+                frame.resetFrame();
+            }
+        }
+
+        currentOffsetInEachFrameList.clear();
+        for (int i = 0; i < contents.size(); i++) {
+            currentOffsetInEachFrameList.add(0);
+        }
+
+        currentLargestFrameNumber = 0;
+        tupleCount = 0;
+        currentByteSize = 0;
+    }
+
+    @Override
+    public int getCurrentByteSize() {
+        return currentByteSize;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    /**
+     * Returns the tuple count in the slot for the given entry.
+     */
+    public int getTupleCount(int entry) {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+        if (headerFrame != null) {
+            int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+            int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1);
+            if (contentFrameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(contentFrameIndex);
+                int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1);
+                return entryUsedCountInSlot;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public void close() {
+        int nFrames = contents.size();
+        for (int i = 0; i < headers.length; i++) {
+            headers[i] = null;
+        }
+        contents.clear();
+        currentOffsetInEachFrameList.clear();
+        tupleCount = 0;
+        currentByteSize = 0;
+        currentLargestFrameNumber = 0;
+        ctx.deallocateFrames(nFrames);
+    }
+
+    protected boolean insertNewEntry(IntSerDeBuffer header, int offsetInHeaderFrame, int entryCapacity,
+            TuplePointer pointer) throws HyracksDataException {
+        IntSerDeBuffer lastContentFrame = contents.get(currentLargestFrameNumber);
+        int lastOffsetInCurrentFrame = currentOffsetInEachFrameList.get(currentLargestFrameNumber);
+        int requiredIntCapacity = entryCapacity * 2;
+        int currentFrameNumber = currentLargestFrameNumber;
+        boolean currentFrameNumberChanged = false;
+
+        if (lastOffsetInCurrentFrame + requiredIntCapacity >= frameCapacity) {
+            IntSerDeBuffer newContentFrame;
+            // At least we need to have the mata-data (slot capacity and used count) and
+            // one tuplePointer in the same frame (4 INT_SIZE).
+            // So, if there is not enough space for this, we just move on to the next page.
+            if ((lastOffsetInCurrentFrame + 4) > frameCapacity) {
+                // Swipe the region that can't be used.
+                lastContentFrame.writeInvalidVal(lastOffsetInCurrentFrame, frameCapacity - lastOffsetInCurrentFrame);
+                currentFrameNumber++;
+                lastOffsetInCurrentFrame = 0;
+                currentFrameNumberChanged = true;
+            }
+            do {
+                if (currentLargestFrameNumber >= contents.size() - 1) {
+                    ByteBuffer newFrame = getFrame(frameSize);
+                    if (newFrame == null) {
+                        return false;
+                    }
+                    newContentFrame = new IntSerDeBuffer(newFrame);
+                    currentLargestFrameNumber++;
+                    contents.add(newContentFrame);
+                    currentOffsetInEachFrameList.add(0);
+                } else {
+                    currentLargestFrameNumber++;
+                    currentOffsetInEachFrameList.set(currentLargestFrameNumber, 0);
+                }
+                requiredIntCapacity -= frameCapacity;
+            } while (requiredIntCapacity > 0);
+        }
+
+        if (currentFrameNumberChanged) {
+            lastContentFrame = contents.get(currentFrameNumber);
+        }
+
+        // sets the header
+        header.writeInt(offsetInHeaderFrame, currentFrameNumber);
+        header.writeInt(offsetInHeaderFrame + 1, lastOffsetInCurrentFrame);
+
+        // sets the entry & its slot.
+        // 1. slot capacity
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame, entryCapacity - 1);
+        // 2. used count in the slot
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 1, 1);
+        // 3. initial entry in the slot
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 2, pointer.getFrameIndex());
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 3, pointer.getTupleIndex());
+        int newLastOffsetInContentFrame = lastOffsetInCurrentFrame + entryCapacity * 2;
+        newLastOffsetInContentFrame = newLastOffsetInContentFrame < frameCapacity ? newLastOffsetInContentFrame
+                    : frameCapacity - 1;
+        currentOffsetInEachFrameList.set(currentFrameNumber, newLastOffsetInContentFrame);
+
+        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastOffsetInCurrentFrame);
+        while (requiredIntCapacity > 0) {
+            currentFrameNumber++;
+            requiredIntCapacity -= frameCapacity;
+            newLastOffsetInContentFrame = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity
+                    : frameCapacity - 1;
+            currentOffsetInEachFrameList.set(currentFrameNumber, newLastOffsetInContentFrame);
+        }
+
+        return true;
+    }
+
+    protected boolean insertNonFirstTuple(IntSerDeBuffer header, int offsetInHeaderFrame, int contentFrameIndex,
+            int offsetInContentFrame, TuplePointer pointer) throws HyracksDataException {
+        int frameIndex = contentFrameIndex;
+        IntSerDeBuffer contentFrame = contents.get(frameIndex);
+        int entrySlotCapacity = contentFrame.getInt(offsetInContentFrame);
+        int entryUsedCountInSlot = contentFrame.getInt(offsetInContentFrame + 1);
+        boolean frameIndexChanged = false;
+        if (entryUsedCountInSlot < entrySlotCapacity) {
+            // The slot has at least one space to accommodate this tuple pointer.
+            // Increase the used count by 1.
+            contentFrame.writeInt(offsetInContentFrame + 1, entryUsedCountInSlot + 1);
+            // Calculates the first empty spot in the slot.
+            // +2: (capacity, # of used entry count)
+            // *2: each tuplePointer's occupation (frame index + offset in that frame)
+            int startOffsetInContentFrame = offsetInContentFrame + 2 + entryUsedCountInSlot * 2;
+            while (startOffsetInContentFrame >= frameCapacity) {
+                ++frameIndex;
+                startOffsetInContentFrame -= frameCapacity;
+                frameIndexChanged = true;
+            }
+            // We don't have to read content frame again if the frame index has not been changed.
+            if (frameIndexChanged) {
+                contentFrame = contents.get(frameIndex);
+            }
+            contentFrame.writeInt(startOffsetInContentFrame, pointer.getFrameIndex());
+            contentFrame.writeInt(startOffsetInContentFrame + 1, pointer.getTupleIndex());
+        } else {
+            // There is no enough space in this slot. We need to increase the slot size and
+            // migrate the current entries in it.
+
+            // New capacity: double the original capacity
+            int capacity = (entrySlotCapacity + 1) * 2;
+
+            // Temporarily sets the header (frameIdx, offset) as (-1,-1) for the slot.
+            header.writeInvalidVal(offsetInHeaderFrame, 2);
+            // Marks the old slot as obsolete - set the used count as -1 so that its space can be reclaimed
+            // when a garbage collection is executed.
+            contentFrame.writeInvalidVal(offsetInContentFrame + 1, 1);
+
+            // Gets the location of the initial entry.
+            int fIndex = contentFrame.getInt(offsetInContentFrame + 2);
+            int tIndex = contentFrame.getInt(offsetInContentFrame + 3);
+            tempTuplePointer.reset(fIndex, tIndex);
+            // Creates a new double-sized slot for the current entries and
+            // migrates the initial entry in the slot to the new slot.
+            if (!this.insertNewEntry(header, offsetInHeaderFrame, capacity, tempTuplePointer)) {
+                // Reverses the effect of change.
+                header.writeInt(offsetInHeaderFrame, contentFrameIndex);
+                header.writeInt(offsetInHeaderFrame + 1, offsetInContentFrame);
+                contentFrame.writeInt(offsetInContentFrame + 1, entryUsedCountInSlot);
+                return false;
+            }
+
+            int newFrameIndex = header.getInt(offsetInHeaderFrame);
+            int newTupleIndex = header.getInt(offsetInHeaderFrame + 1);
+
+            // Migrates the existing entries (from 2nd to the last).
+            for (int i = 1; i < entryUsedCountInSlot; i++) {
+                int startOffsetInContentFrame = offsetInContentFrame + 2 + i * 2;
+                int startFrameIndex = frameIndex;
+                while (startOffsetInContentFrame >= frameCapacity) {
+                    ++startFrameIndex;
+                    startOffsetInContentFrame -= frameCapacity;
+                }
+                contentFrame = contents.get(startFrameIndex);
+                fIndex = contentFrame.getInt(startOffsetInContentFrame);
+                tIndex = contentFrame.getInt(startOffsetInContentFrame + 1);
+                tempTuplePointer.reset(fIndex, tIndex);
+                if (!insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, tempTuplePointer)) {
+                    return false;
+                }
+            }
+            // Now, inserts the new entry that caused an overflow to the old bucket.
+            if (!insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, pointer)) {
+                return false;
+            }
+            increaseWastedSpaceCount(capacity);
+        }
+        return true;
+    }
+
+    protected int getHeaderFrameIndex(int entry) {
+        int frameIndex = entry * 2 / frameCapacity;
+        return frameIndex;
+    }
+
+    protected int getHeaderFrameOffset(int entry) {
+        int offset = entry * 2 % frameCapacity;
+        return offset;
+    }
+
+    public static int getUnitSize() {
+        return INT_SIZE;
+    }
+
+    public static int getNumberOfEntryInSlot() {
+        return INIT_ENTRY_SIZE;
+    }
+
+    public static int getExpectedByteSizePerHashValue() {
+        // first constant 2: capacity, # of used count
+        // second constant 2: tuple pointer (frameIndex, offset)
+        return getUnitSize() * (2 + getNumberOfEntryInSlot() * 2);
+    }
+
+    /**
+     * Calculates the expected hash table size based on a scenario: there are no duplicated entries so that
+     * each entry is assigned to all possible slots.
+     *
+     * @param tableSize
+     *            : the cardinality of the hash table - number of slots
+     * @param frameSize
+     *            : the frame size
+     * @return
+     *         expected the byte size of the hash table
+     */
+    public static long getExpectedTableFrameCount(long tableSize, int frameSize) {
+        long numberOfHeaderFrame = (long) (Math.ceil((double) tableSize * 2 / (double) frameSize));
+        long numberOfContentFrame = (long) (Math
+                .ceil(((double) getNumberOfEntryInSlot() * 2 * getUnitSize() * tableSize) / (double) frameSize));
+        return numberOfHeaderFrame + numberOfContentFrame;
+    }
+
+    public static long getExpectedTableByteSize(long tableSize, int frameSize) {
+        return getExpectedTableFrameCount(tableSize, frameSize) * frameSize;
+    }
+
+    /**
+     * Calculates the frame count increment/decrement for a new table size with the original size.
+     *
+     * @param origTableSize
+     *            : the original table cardinality
+     * @param delta
+     *            : a delta (a positive value means that the cardinality of the table will be increased.)
+     *            a negative value means that the cardinality of the table will be decreased.
+     * @return the frame count increment/decrement: a positive number means that the table size will be increased.
+     *         a negative number means that the table size will be decreased.
+     */
+    public static long calculateFrameCountDeltaForTableSizeChange(long origTableSize, long delta, int frameSize) {
+        long originalFrameCount = getExpectedTableFrameCount(origTableSize, frameSize);
+        long newFrameCount = getExpectedTableFrameCount(origTableSize + delta, frameSize);
+        return newFrameCount - originalFrameCount;
+    }
+
+    public static long calculateByteSizeDeltaForTableSizeChange(long origTableSize, long delta, int frameSize) {
+        return calculateFrameCountDeltaForTableSizeChange(origTableSize, delta, frameSize) * frameSize;
+    }
+
+    @Override
+    public boolean isGarbageCollectionNeeded() {
+        // This class doesn't support the garbage collection.
+        return false;
+    }
+
+    @Override
+    public int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc)
+            throws HyracksDataException {
+        // This class doesn't support the garbage collection.
+        return -1;
+    }
+
+    static class IntSerDeBuffer {
+
+        ByteBuffer byteBuffer;
+        byte[] bytes;
+
+        public IntSerDeBuffer(ByteBuffer byteBuffer) {
+            this.byteBuffer = byteBuffer;
+            this.bytes = byteBuffer.array();
+            resetFrame();
+        }
+
+        public int getInt(int pos) {
+            int offset = pos * 4;
+            return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
+                    + ((bytes[offset + 2] & 0xff) << 8) + (bytes[offset + 3] & 0xff);
+        }
+
+        public void writeInt(int pos, int value) {
+            int offset = pos * 4;
+            bytes[offset++] = (byte) (value >> 24);
+            bytes[offset++] = (byte) (value >> 16);
+            bytes[offset++] = (byte) (value >> 8);
+            bytes[offset] = (byte) (value);
+        }
+
+        public void writeInvalidVal(int intPos, int intRange) {
+            int offset = intPos * 4;
+            Arrays.fill(bytes, offset, offset + INT_SIZE * intRange, INVALID_BYTE_VALUE);
+        }
+
+        public int capacity() {
+            return bytes.length / 4;
+        }
+
+        public int getByteCapacity() {
+            return bytes.length;
+        }
+
+        public ByteBuffer getByteBuffer() {
+            return byteBuffer;
+        }
+
+        public void resetFrame() {
+            Arrays.fill(bytes, INVALID_BYTE_VALUE);
+        }
+
+    }
+
+    @Override
+    public String printInfo() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java
index 43a1f6d..eeec223 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java
@@ -23,8 +23,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,11 +38,17 @@ public class SerializableHashTableTest {
     SerializableHashTable nsTable;
     final int NUM_PART = 101;
     TuplePointer pointer = new TuplePointer(0, 0);
-    final int num = 1000;
+    final int num = 10000;
+    protected IHyracksFrameMgrContext ctx;
+    private IDeallocatableFramePool framePool;
+    private ISimpleFrameBufferManager bufferManager;
 
     @Before
     public void setup() throws HyracksDataException {
-        nsTable = new SerializableHashTable(NUM_PART, new FrameManager(256));
+        ctx = new FrameManager(256);
+        framePool = new DeallocatableFramePool(ctx, ctx.getInitialFrameSize() * 2048);
+        bufferManager = new FramePoolBackedFrameBufferManager(framePool);
+        nsTable = new SerializableHashTable(NUM_PART, ctx, bufferManager);
     }
 
     @Test
@@ -66,7 +77,7 @@ public class SerializableHashTableTest {
         assertGetValue();
     }
 
-    private void assertGetValue() {
+    protected void assertGetValue() {
         int loop = 0;
         for (int i = 0; i < num; i++) {
             assertTrue(nsTable.getTuplePointer(i % NUM_PART, loop, pointer));
@@ -75,8 +86,9 @@ public class SerializableHashTableTest {
                 loop++;
             }
         }
+        int tupleCntPerPart = (int) Math.ceil((double) num / NUM_PART);
         for (int i = 0; i < NUM_PART; i++) {
-            assertTrue(nsTable.getTupleCount(i) == 10 || nsTable.getTupleCount(i) == 9);
+            assertTrue(nsTable.getTupleCount(i) == tupleCntPerPart || nsTable.getTupleCount(i) == tupleCntPerPart - 1);
         }
 
     }
@@ -86,7 +98,7 @@ public class SerializableHashTableTest {
         assertAllPartitionsCountIsZero();
     }
 
-    private void assertAllPartitionsCountIsZero() {
+    protected void assertAllPartitionsCountIsZero() {
         for (int i = 0; i < NUM_PART; i++) {
             assertEquals(0, nsTable.getTupleCount(i));
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java
new file mode 100644
index 0000000..027ee27
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.structures;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SimpleSerializableHashTableTest {
+
+    SimpleSerializableHashTable nsTable;
+    final int NUM_PART = 101;
+    TuplePointer pointer = new TuplePointer(0, 0);
+    final int num = 10000;
+    private IHyracksFrameMgrContext ctx;
+
+    @Before
+    public void setup() throws HyracksDataException {
+        ctx = new FrameManager(256);
+        nsTable = new SimpleSerializableHashTable(NUM_PART, ctx);
+    }
+
+    @Test
+    public void testBatchDeletePartition() throws Exception {
+        testInsert();
+        for (int i = 0; i < NUM_PART; i++) {
+            nsTable.delete(i);
+            assertFalse(nsTable.getTuplePointer(i, 0, pointer));
+            assertEquals(0, nsTable.getTupleCount(i));
+
+            for (int j = i; j < num; j += NUM_PART) {
+                pointer.reset(j, j);
+                nsTable.insert(i, pointer);
+            }
+
+            assertGetValue();
+        }
+    }
+
+    @Test
+    public void testInsert() throws Exception {
+        for (int i = 0; i < num; i++) {
+            pointer.reset(i, i);
+            nsTable.insert(i % NUM_PART, pointer);
+        }
+        assertGetValue();
+    }
+
+    private void assertGetValue() {
+        int loop = 0;
+        for (int i = 0; i < num; i++) {
+            assertTrue(nsTable.getTuplePointer(i % NUM_PART, loop, pointer));
+            assertTrue(pointer.getFrameIndex() == i);
+            if (i % NUM_PART == NUM_PART - 1) {
+                loop++;
+            }
+        }
+        int tupleCntPerPart = (int) Math.ceil((double) num / NUM_PART);
+        for (int i = 0; i < NUM_PART; i++) {
+            assertTrue(nsTable.getTupleCount(i) == tupleCntPerPart || nsTable.getTupleCount(i) == tupleCntPerPart - 1);
+        }
+
+    }
+
+    @Test
+    public void testGetCount() throws Exception {
+        assertAllPartitionsCountIsZero();
+    }
+
+    private void assertAllPartitionsCountIsZero() {
+        for (int i = 0; i < NUM_PART; i++) {
+            assertEquals(0, nsTable.getTupleCount(i));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index f20461c..f169054 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -160,7 +160,7 @@ public class AggregationTest extends AbstractIntegrationTest {
                 IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -249,7 +249,7 @@ public class AggregationTest extends AbstractIntegrationTest {
                 IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -338,7 +338,7 @@ public class AggregationTest extends AbstractIntegrationTest {
                         IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -429,7 +429,7 @@ public class AggregationTest extends AbstractIntegrationTest {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -524,7 +524,7 @@ public class AggregationTest extends AbstractIntegrationTest {
                         IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -619,7 +619,7 @@ public class AggregationTest extends AbstractIntegrationTest {
                 IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
 
         int[] keyFields = new int[] { 8, 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index f8c7487..7075fe9 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -125,7 +125,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -293,7 +293,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                null, custOrderJoinDesc, true, nonMatchWriterFactories, 128);
+                null, custOrderJoinDesc, true, nonMatchWriterFactories, 128, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -464,7 +464,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -552,7 +552,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 100, 1.2,
                 new int[] { 1 }, new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
@@ -647,7 +647,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                         .of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -743,7 +743,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
index b8ec790..f83ab6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -28,9 +28,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -56,6 +53,8 @@ import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFac
 import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
 
 public abstract class AbstractExternalGroupbyTest {
 
@@ -176,8 +175,8 @@ public abstract class AbstractExternalGroupbyTest {
 
     @Test
     public void testBuildAndMergeNormalFrameInMem() throws HyracksDataException {
-        int tableSize = 1001;
-        int numFrames = 3;
+        int tableSize = 101;
+        int numFrames = 23;
         int frameSize = 256;
         int minDataSize = frameSize;
         int minRecordSize = 20;
@@ -187,10 +186,10 @@ public abstract class AbstractExternalGroupbyTest {
 
     @Test
     public void testBuildAndMergeNormalFrameSpill() throws HyracksDataException {
-        int tableSize = 1001;
-        int numFrames = 3;
+        int tableSize = 101;
+        int numFrames = 23;
         int frameSize = 256;
-        int minDataSize = frameSize * 4;
+        int minDataSize = frameSize * 40;
         int minRecordSize = 20;
         int maxRecordSize = 50;
         testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
@@ -198,16 +197,14 @@ public abstract class AbstractExternalGroupbyTest {
 
     @Test
     public void testBuildAndMergeBigObj() throws HyracksDataException {
-        int tableSize = 1001;
-        int numFrames = 4;
+        int tableSize = 101;
+        int numFrames = 23;
         int frameSize = 256;
-        int minDataSize = frameSize * 5;
+        int minDataSize = frameSize * 40;
         int minRecordSize = 20;
         int maxRecordSize = 50;
-        HashMap<Integer, String> bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 2);
-        testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize,
-                bigRecords);
-
+        HashMap<Integer, String> bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 3);
+        testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, bigRecords);
     }
 
     protected abstract void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 65073e0..c3d0df1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -148,7 +148,7 @@ public class Join {
     private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
             FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
             double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
@@ -183,7 +183,7 @@ public class Join {
                     new IBinaryHashFunctionFactory[] {
                             PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    Common.custOrderJoinDesc, tableSize, null);
+                    Common.custOrderJoinDesc, tableSize, null, memSize * frameSize);
 
         } else if ("hybrid".equalsIgnoreCase(algo)) {
             join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,


Mime
View raw message