Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 698AB200C16 for ; Thu, 5 Jan 2017 00:46:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 67FB7160B44; Wed, 4 Jan 2017 23:46:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 276AB160B3A for ; Thu, 5 Jan 2017 00:46:30 +0100 (CET) Received: (qmail 98394 invoked by uid 500); 4 Jan 2017 23:46:29 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 98381 invoked by uid 99); 4 Jan 2017 23:46:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jan 2017 23:46:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 30E7DDF9FC; Wed, 4 Jan 2017 23:46:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangsaeu@apache.org To: commits@asterixdb.apache.org Date: Wed, 04 Jan 2017 23:46:29 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] asterixdb git commit: ASTERIXDB-1556, ASTERIXDB-1733: Hash Group By and Hash Join conform to the memory budget archived-at: Wed, 04 Jan 2017 23:46:32 -0000 Repository: asterixdb Updated Branches: refs/heads/master 1355c269f -> 8b2aceeb9 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java index 9584f26..ca97be3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java @@ -18,307 +18,552 @@ */ package org.apache.hyracks.dataflow.std.structures; -import java.util.ArrayList; -import java.util.List; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.TreeMap; import org.apache.hyracks.api.context.IHyracksFrameMgrContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager; +import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor; /** - * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex; - * fIndex, tIndex; .... forms a tuple pointer + * This is an extension of SimpleSerializableHashTable class. + * A buffer manager needs to be assigned to allocate/release frames for this table so that + * the maximum memory usage can be bounded under the certain limit. */ -public class SerializableHashTable implements ISerializableTable { - - private static final int INT_SIZE = 4; - private static final int INIT_ENTRY_SIZE = 4; - - private IntSerDeBuffer[] headers; - private List contents = new ArrayList<>(); - private List frameCurrentIndex = new ArrayList<>(); - private final IHyracksFrameMgrContext ctx; - private final int frameCapacity; - private int currentLargestFrameIndex = 0; - private int tupleCount = 0; - private int headerFrameCount = 0; - private TuplePointer tempTuplePointer = new TuplePointer(); - - public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException { - this.ctx = ctx; - int frameSize = ctx.getInitialFrameSize(); - - int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1; - int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual; - headers = new IntSerDeBuffer[headerSize]; - - IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array()); - contents.add(frame); - frameCurrentIndex.add(0); - frameCapacity = frame.capacity(); - } +public class SerializableHashTable extends SimpleSerializableHashTable { - @Override - public void insert(int entry, TuplePointer pointer) throws HyracksDataException { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; - if (header == null) { - header = new IntSerDeBuffer(ctx.allocateFrame().array()); - headers[hFrameIndex] = header; - resetFrame(header); - headerFrameCount++; - } - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); - if (frameIndex < 0) { - // insert first tuple into the entry - insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer); - } else { - // insert non-first tuple into the entry - insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer); - } - tupleCount++; - } + protected double garbageCollectionThreshold; + protected int wastedIntSpaceCount = 0; + protected ISimpleFrameBufferManager bufferManager; - @Override - public void delete(int entry) { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; - if (header != null) { - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); - if (frameIndex >= 0) { - IntSerDeBuffer frame = contents.get(frameIndex); - int entryUsedItems = frame.getInt(offsetIndex + 1); - frame.writeInt(offsetIndex + 1, 0); - tupleCount -= entryUsedItems; - } - } + public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx, + ISimpleFrameBufferManager bufferManager) throws HyracksDataException { + this(tableSize, ctx, bufferManager, 0.1); } - @Override - public boolean getTuplePointer(int entry, int offset, TuplePointer dataPointer) { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; - if (header == null) { - dataPointer.reset(-1, -1); - return false; - } - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); - if (frameIndex < 0) { - dataPointer.reset(-1, -1); - return false; - } - IntSerDeBuffer frame = contents.get(frameIndex); - int entryUsedItems = frame.getInt(offsetIndex + 1); - if (offset > entryUsedItems - 1) { - dataPointer.reset(-1, -1); - return false; - } - int startIndex = offsetIndex + 2 + offset * 2; - while (startIndex >= frameCapacity) { - ++frameIndex; - startIndex -= frameCapacity; - } - frame = contents.get(frameIndex); - dataPointer.reset(frame.getInt(startIndex), frame.getInt(startIndex + 1)); - return true; - } - - @Override - public void reset() { - for (IntSerDeBuffer frame : headers) - if (frame != null) - resetFrame(frame); + public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx, + ISimpleFrameBufferManager bufferManager, double garbageCollectionThreshold) + throws HyracksDataException { + super(tableSize, ctx, false); + this.bufferManager = bufferManager; - frameCurrentIndex.clear(); - for (int i = 0; i < contents.size(); i++) { - frameCurrentIndex.add(0); + ByteBuffer newFrame = getFrame(frameSize); + if (newFrame == null) { + throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget."); } - - currentLargestFrameIndex = 0; - tupleCount = 0; + IntSerDeBuffer frame = new IntSerDeBuffer(newFrame); + frameCapacity = frame.capacity(); + contents.add(frame); + currentOffsetInEachFrameList.add(0); + this.garbageCollectionThreshold = garbageCollectionThreshold; } @Override - public int getFrameCount() { - return headerFrameCount + contents.size(); + ByteBuffer getFrame(int size) throws HyracksDataException { + ByteBuffer newFrame = bufferManager.acquireFrame(size); + if (newFrame != null) { + currentByteSize += size; + } + return newFrame; } @Override - public int getTupleCount() { - return tupleCount; + void increaseWastedSpaceCount(int size) { + wastedIntSpaceCount += size; } @Override - public int getTupleCount(int entry) { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; - if (header != null) { - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); - if (frameIndex >= 0) { - IntSerDeBuffer frame = contents.get(frameIndex); - int entryUsedItems = frame.getInt(offsetIndex + 1); - return entryUsedItems; - } - } - return 0; + public void reset() { + super.reset(); + currentByteSize = 0; } @Override public void close() { - int nFrames = contents.size(); - int hFrames = 0; for (int i = 0; i < headers.length; i++) { if (headers[i] != null) { - hFrames++; + bufferManager.releaseFrame(headers[i].getByteBuffer()); headers[i] = null; } } + for (int i = 0; i < contents.size(); i++) { + bufferManager.releaseFrame(contents.get(i).getByteBuffer()); + } contents.clear(); - frameCurrentIndex.clear(); + currentOffsetInEachFrameList.clear(); tupleCount = 0; - currentLargestFrameIndex = 0; - ctx.deallocateFrames((nFrames + hFrames) * frameCapacity * 4); + currentByteSize = 0; + currentLargestFrameNumber = 0; + } + + @Override + public boolean isGarbageCollectionNeeded() { + return wastedIntSpaceCount > frameCapacity * (currentLargestFrameNumber + 1) * garbageCollectionThreshold; } - private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer) + /** + * Collects garbages. The steps are as follows. + * #1. Initialize the Reader and Writer. The starting frame index is set to zero at this moment. + * #2. Read a content frame. Find and read a slot data. Check the number of used count for the slot. + * If it's not -1 (meaning that it is being used now), we move it to to the + * current writing offset of the Writer frame. Update the corresponding h() value pointer for this location + * in the header frame. We can find the h() value of the slot using a first tuple pointer in the slot. + * If the number is -1 (meaning that it is migrated to a new place due to an overflow or deleted), + * just reclaim the space by letting other slot move to this space. + * #3. Once a Reader reaches the end of a frame, read next frame by frame. This applies to the Writer, too. i.e. + * If the writing offset pointer reaches at the end of a frame, then writing frame will be set to the next frame. + * #4. Repeat #1 ~ #3 until all frames are read. + * + * @return the number of frames that are reclaimed. The value -1 is returned when no compaction was happened. + */ + @Override + public int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc) throws HyracksDataException { - IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex); - int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex); - int requiredIntCapacity = entryCapacity * 2; - int startFrameIndex = currentLargestFrameIndex; - - if (lastIndex + requiredIntCapacity >= frameCapacity) { - IntSerDeBuffer newFrame; - startFrameIndex++; - do { - if (currentLargestFrameIndex >= contents.size() - 1) { - newFrame = new IntSerDeBuffer(ctx.allocateFrame().array()); - currentLargestFrameIndex++; - contents.add(newFrame); - frameCurrentIndex.add(0); + // Keeps the garbage collection related variable + GarbageCollectionInfo gcInfo = new GarbageCollectionInfo(); + + int slotCapacity; + int slotUsedCount; + int capacityInIntCount; + int nextSlotIntPosInPageForGC; + boolean currentPageChanged; + IntSerDeBuffer currentReadContentFrameForGC; + IntSerDeBuffer currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC); + int lastOffsetInLastFrame = currentOffsetInEachFrameList.get(contents.size() - 1); + + // Step #1. Reads a content frame until it reaches the end of content frames. + while (gcInfo.currentReadPageForGC <= currentLargestFrameNumber) { + + gcInfo.currentReadIntOffsetInPageForGC = 0; + currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC); + + // Step #2. Advances the reader until it hits the end of the given frame. + while (gcInfo.currentReadIntOffsetInPageForGC < frameCapacity) { + nextSlotIntPosInPageForGC = findNextSlotInPage(currentReadContentFrameForGC, + gcInfo.currentReadIntOffsetInPageForGC); + + if (nextSlotIntPosInPageForGC == INVALID_VALUE) { + // There isn't a valid slot in the page. Exits the loop #2 and reads the next frame. + break; + } + + // Valid slot found. Reads the given slot information. + slotCapacity = currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC); + slotUsedCount = currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 1); + capacityInIntCount = (slotCapacity + 1) * 2; + + // Used count should not be -1 (migrated or deleted). + if (slotUsedCount != INVALID_VALUE) { + // To prepare hash pointer (header -> content) update, read the first tuple pointer in the old slot. + tempTuplePointer.reset(currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 2), + currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 3)); + + // Check whether there is at least some space to put some part of the slot. + // If not, advance the write pointer to the next page. + if ((gcInfo.currentWriteIntOffsetInPageForGC + 4) > frameCapacity + && gcInfo.currentGCWritePageForGC < currentLargestFrameNumber) { + // Swipe the region that can't be used. + currentWriteContentFrameForGC.writeInvalidVal(gcInfo.currentWriteIntOffsetInPageForGC, + frameCapacity - gcInfo.currentWriteIntOffsetInPageForGC); + gcInfo.currentGCWritePageForGC++; + currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC); + gcInfo.currentWriteIntOffsetInPageForGC = 0; + } + + // Migrates this slot to the current offset in Writer's Frame if possible. + currentPageChanged = MigrateSlot(gcInfo, bufferAccessor, tpc, capacityInIntCount, + nextSlotIntPosInPageForGC); + + if (currentPageChanged) { + currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC); + currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC); + } } else { - currentLargestFrameIndex++; - frameCurrentIndex.set(currentLargestFrameIndex, 0); + // A useless slot (either migrated or deleted) is found. Resets the space + // so it will be occupied by the next valid slot. + currentPageChanged = resetSlotSpace(gcInfo, nextSlotIntPosInPageForGC, capacityInIntCount); + + if (currentPageChanged) { + currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC); + } + } - requiredIntCapacity -= frameCapacity; - } while (requiredIntCapacity > 0); - lastIndex = 0; - lastFrame = contents.get(startFrameIndex); + } + + // Reached the end of a frame. Advances the Reader. + if (gcInfo.currentReadPageForGC == currentLargestFrameNumber) { + break; + } + gcInfo.currentReadPageForGC++; } - // set header - header.writeInt(headerOffset, startFrameIndex); - header.writeInt(headerOffset + 1, lastIndex); - - // set the entry - lastFrame.writeInt(lastIndex, entryCapacity - 1); - lastFrame.writeInt(lastIndex + 1, 1); - lastFrame.writeInt(lastIndex + 2, pointer.getFrameIndex()); - lastFrame.writeInt(lastIndex + 3, pointer.getTupleIndex()); - int newLastIndex = lastIndex + entryCapacity * 2; - newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1; - frameCurrentIndex.set(startFrameIndex, newLastIndex); - - requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex); - while (requiredIntCapacity > 0) { - startFrameIndex++; - requiredIntCapacity -= frameCapacity; - newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1; - frameCurrentIndex.set(startFrameIndex, newLastIndex); + // More unused frames at the end? + int extraFrames = 0; + if (contents.size() > (currentLargestFrameNumber + 1)) { + extraFrames = contents.size() - (currentLargestFrameNumber + 1); } - } - private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndexArg, int offsetIndex, - TuplePointer pointer) throws HyracksDataException { - int frameIndex = frameIndexArg; - IntSerDeBuffer frame = contents.get(frameIndex); - int entryItems = frame.getInt(offsetIndex); - int entryUsedItems = frame.getInt(offsetIndex + 1); - - if (entryUsedItems < entryItems) { - frame.writeInt(offsetIndex + 1, entryUsedItems + 1); - int startIndex = offsetIndex + 2 + entryUsedItems * 2; - while (startIndex >= frameCapacity) { - ++frameIndex; - startIndex -= frameCapacity; + // Done reading all frames. So, releases unnecessary frames. + int numberOfFramesToBeDeallocated = gcInfo.currentReadPageForGC + extraFrames - gcInfo.currentGCWritePageForGC; + + if (numberOfFramesToBeDeallocated >= 1) { + for (int i = 0; i < numberOfFramesToBeDeallocated; i++) { + currentByteSize -= contents.get(gcInfo.currentGCWritePageForGC + 1).getByteCapacity(); + bufferManager.releaseFrame(contents.get(gcInfo.currentGCWritePageForGC + 1).getByteBuffer()); + contents.remove(gcInfo.currentGCWritePageForGC + 1); + currentOffsetInEachFrameList.remove(gcInfo.currentGCWritePageForGC + 1); } - frame = contents.get(frameIndex); - frame.writeInt(startIndex, pointer.getFrameIndex()); - frame.writeInt(startIndex + 1, pointer.getTupleIndex()); } else { - int capacity = (entryItems + 1) * 2; - header.writeInt(headerOffset, -1); - header.writeInt(headerOffset + 1, -1); - int fIndex = frame.getInt(offsetIndex + 2); - int tIndex = frame.getInt(offsetIndex + 3); - tempTuplePointer.reset(fIndex, tIndex); - this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer); - int newFrameIndex = header.getInt(headerOffset); - int newTupleIndex = header.getInt(headerOffset + 1); - - for (int i = 1; i < entryUsedItems; i++) { - int startIndex = offsetIndex + 2 + i * 2; - int startFrameIndex = frameIndex; - while (startIndex >= frameCapacity) { - ++startFrameIndex; - startIndex -= frameCapacity; + // For this case, we check whether the last offset is changed. + // If not, we didn't get any space from the operation. + int afterLastOffsetInLastFrame = currentOffsetInEachFrameList.get(gcInfo.currentGCWritePageForGC); + if (lastOffsetInLastFrame == afterLastOffsetInLastFrame) { + numberOfFramesToBeDeallocated = -1; + } + } + + // Resets the current offset in the last frame so that the future insertions will work without an issue. + currentLargestFrameNumber = gcInfo.currentGCWritePageForGC; + currentOffsetInEachFrameList.set(gcInfo.currentGCWritePageForGC, gcInfo.currentWriteIntOffsetInPageForGC); + + wastedIntSpaceCount = 0; + tempTuplePointer.reset(INVALID_VALUE, INVALID_VALUE); + + return numberOfFramesToBeDeallocated; + } + + /** + * Migrates the current slot to the designated place and reset the current space using INVALID_VALUE. + * + * @return true if the current page has been changed. false if not. + */ + private boolean MigrateSlot(GarbageCollectionInfo gcInfo, ITuplePointerAccessor bufferAccessor, + ITuplePartitionComputer tpc, int capacityInIntCount, int nextSlotIntPosInPageForGC) + throws HyracksDataException { + boolean currentPageChanged = false; + // If the reader and writer indicate the same slot location, a move is not required. + if (gcInfo.isReaderWriterAtTheSamePos()) { + int intToRead = capacityInIntCount; + int intReadAtThisTime; + gcInfo.currentReadIntOffsetInPageForGC = nextSlotIntPosInPageForGC; + while (intToRead > 0) { + intReadAtThisTime = Math.min(intToRead, frameCapacity - gcInfo.currentReadIntOffsetInPageForGC); + gcInfo.currentReadIntOffsetInPageForGC += intReadAtThisTime; + if (gcInfo.currentReadIntOffsetInPageForGC >= frameCapacity + && gcInfo.currentReadPageForGC < currentLargestFrameNumber) { + gcInfo.currentReadPageForGC++; + gcInfo.currentReadIntOffsetInPageForGC = 0; + currentPageChanged = true; } - frame = contents.get(startFrameIndex); - fIndex = frame.getInt(startIndex); - tIndex = frame.getInt(startIndex + 1); - tempTuplePointer.reset(fIndex, tIndex); - insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer); + intToRead -= intReadAtThisTime; } - insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer); + + gcInfo.currentGCWritePageForGC = gcInfo.currentReadPageForGC; + gcInfo.currentWriteIntOffsetInPageForGC = gcInfo.currentReadIntOffsetInPageForGC; + + return currentPageChanged; } + + // The reader is ahead of the writer. We can migrate the given slot towards to the beginning of + // the content frame(s). + int tempWriteIntPosInPage = gcInfo.currentWriteIntOffsetInPageForGC; + int tempReadIntPosInPage = nextSlotIntPosInPageForGC; + int chunksToMove = capacityInIntCount; + int chunksToMoveAtThisTime; + + // To keep the original writing page that is going to be used for updating the header to content frame, + // we declare a local variable. + int tempWritePage = gcInfo.currentGCWritePageForGC; + + // Keeps the maximum INT chunks that writer/reader can write in the current page. + int oneTimeIntCapacityForWriter; + int oneTimeIntCapacityForReader; + + IntSerDeBuffer currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC); + IntSerDeBuffer currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC); + + // Moves the slot. + while (chunksToMove > 0) { + oneTimeIntCapacityForWriter = Math.min(chunksToMove, frameCapacity - tempWriteIntPosInPage); + oneTimeIntCapacityForReader = Math.min(chunksToMove, frameCapacity - tempReadIntPosInPage); + + // Since the location of Reader and Writer are different, we can only move a minimum chunk + // before the current page of either Reader or Writer changes. + chunksToMoveAtThisTime = Math.min(oneTimeIntCapacityForWriter, oneTimeIntCapacityForReader); + + // Moves a part of the slot from the Reader to Writer + System.arraycopy(currentReadContentFrameForGC.bytes, tempReadIntPosInPage * INT_SIZE, + currentWriteContentFrameForGC.bytes, tempWriteIntPosInPage * INT_SIZE, + chunksToMoveAtThisTime * INT_SIZE); + + // Clears that part in the Reader + for (int i = 0; i < chunksToMoveAtThisTime; i++) { + // Do not blindly put -1 since there might be overlapping between writer and reader. + if ((gcInfo.currentReadPageForGC != tempWritePage) + || (tempReadIntPosInPage + i >= tempWriteIntPosInPage + chunksToMoveAtThisTime)) { + currentReadContentFrameForGC.writeInvalidVal(tempReadIntPosInPage + i, chunksToMoveAtThisTime - i); + break; + } + } + + // Advances the pointer + tempWriteIntPosInPage += chunksToMoveAtThisTime; + tempReadIntPosInPage += chunksToMoveAtThisTime; + + // Once the writer pointer hits the end of the page, we move to the next content page. + if (tempWriteIntPosInPage >= frameCapacity && tempWritePage < currentLargestFrameNumber) { + tempWritePage++; + currentPageChanged = true; + currentWriteContentFrameForGC = contents.get(tempWritePage); + tempWriteIntPosInPage = 0; + } + + // Once the reader pointer hits the end of the page, we move to the next content page. + if (tempReadIntPosInPage >= frameCapacity && gcInfo.currentReadPageForGC < currentLargestFrameNumber) { + gcInfo.currentReadPageForGC++; + currentPageChanged = true; + currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC); + tempReadIntPosInPage = 0; + } + + chunksToMove -= chunksToMoveAtThisTime; + } + + updateHeaderToContentPointerInHeaderFrame(bufferAccessor, tpc, tempTuplePointer, gcInfo.currentGCWritePageForGC, + gcInfo.currentWriteIntOffsetInPageForGC); + + gcInfo.currentGCWritePageForGC = tempWritePage; + gcInfo.currentWriteIntOffsetInPageForGC = tempWriteIntPosInPage; + gcInfo.currentReadIntOffsetInPageForGC = tempReadIntPosInPage; + + return currentPageChanged; } - private void resetFrame(IntSerDeBuffer frame) { - for (int i = 0; i < frameCapacity; i++) - frame.writeInt(i, -1); + /** + * Completely removes the slot in the given content frame(s) and resets the space. + * For this method, we assume that this slot is not moved to somewhere else. + * + * @return true if the current page has been changed. false if not. + */ + private boolean resetSlotSpace(GarbageCollectionInfo gcInfo, int slotIntPos, int capacityInIntCount) { + boolean currentPageChanged = false; + int tempReadIntPosInPage = slotIntPos; + int chunksToDelete = capacityInIntCount; + int chunksToDeleteAtThisTime; + IntSerDeBuffer currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC); + + while (chunksToDelete > 0) { + chunksToDeleteAtThisTime = Math.min(chunksToDelete, frameCapacity - tempReadIntPosInPage); + + // Clears that part in the Reader + currentReadContentFrameForGC.writeInvalidVal(tempReadIntPosInPage, chunksToDeleteAtThisTime); + + // Advances the pointer + tempReadIntPosInPage += chunksToDeleteAtThisTime; + + // Once the reader pointer hits the end of the page, we move to the next content page. + if (tempReadIntPosInPage >= frameCapacity && gcInfo.currentReadPageForGC < currentLargestFrameNumber) { + gcInfo.currentReadPageForGC++; + currentPageChanged = true; + currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC); + tempReadIntPosInPage = 0; + } + + chunksToDelete -= chunksToDeleteAtThisTime; + } + + gcInfo.currentReadIntOffsetInPageForGC = tempReadIntPosInPage; + + return currentPageChanged; } - private int getHeaderFrameIndex(int entry) { - int frameIndex = entry * 2 / frameCapacity; - return frameIndex; + /** + * Updates the given Header to Content Frame Pointer after calculating the corresponding hash value from the + * given tuple pointer. + */ + private void updateHeaderToContentPointerInHeaderFrame(ITuplePointerAccessor bufferAccessor, + ITuplePartitionComputer tpc, TuplePointer hashedTuple, int newContentFrame, + int newOffsetInContentFrame) throws HyracksDataException { + // Finds the original hash value. We assume that bufferAccessor and tpc is already assigned. + bufferAccessor.reset(hashedTuple); + int entry = tpc.partition(bufferAccessor, hashedTuple.getTupleIndex(), tableSize); + + // Finds the location of the hash value in the header frame arrays. + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer headerFrame = headers[headerFrameIndex]; + + // Updates the hash value. + headerFrame.writeInt(offsetInHeaderFrame, newContentFrame); + headerFrame.writeInt(offsetInHeaderFrame + 1, newOffsetInContentFrame); } - private int getHeaderFrameOffset(int entry) { - int offset = entry * 2 % frameCapacity; - return offset; + + /** + * Tries to find the next valid slot position in the given content frame from the current position. + */ + private int findNextSlotInPage(IntSerDeBuffer frame, int readIntPosAtPage) { + // Sanity check + if (readIntPosAtPage >= frameCapacity) { + return INVALID_VALUE; + } + int intOffset = readIntPosAtPage; + while (frame.getInt(intOffset) == INVALID_VALUE) { + intOffset++; + if (intOffset >= frameCapacity) { + // Couldn't find the next slot in the given page. + return INVALID_VALUE; + } + } + return intOffset; } - private static class IntSerDeBuffer { + /** + * Keeps the garbage collection related variables + */ + private static class GarbageCollectionInfo { + int currentReadPageForGC; + int currentReadIntOffsetInPageForGC; + int currentGCWritePageForGC; + int currentWriteIntOffsetInPageForGC; + + public GarbageCollectionInfo() { + currentReadPageForGC = 0; + currentReadIntOffsetInPageForGC = 0; + currentGCWritePageForGC = 0; + currentWriteIntOffsetInPageForGC = 0; + } + + /** + * Checks whether the writing position and the reading position are the same. + */ + public boolean isReaderWriterAtTheSamePos() { + return currentReadPageForGC == currentGCWritePageForGC + && currentReadIntOffsetInPageForGC == currentWriteIntOffsetInPageForGC; + } + } - private byte[] bytes; + /** + * Returns the current status of this table: the number of slots, frames, space utilization, etc. + */ + @Override + public String printInfo() { + SlotInfoPair slotInfo = new SlotInfoPair<>(0, 0); - public IntSerDeBuffer(byte[] data) { - this.bytes = data; + int nFrames = contents.size(); + int hFrames = 0; + // Histogram Information - counts the number of used count per slot used count (e.g., 10,2 means that + // there are 10 hash slots that only has two hash entries in it.) + Map headerSlotUsedCountMap = new TreeMap<>(); + + // Histogram Information - counts the number of capacity count per slot count (10,3 means that + // there are 10 hash slots whose capacity is 3.) + Map headerSlotCapaCountMap = new TreeMap<>(); + + int headerSlotUsedCount = 0; + int headerSlotTotalCount; + double headerSlotUsedRatio = 0.0; + IntSerDeBuffer header; + int tupleUsedCount; + int tupleUsedCountFromMap; + int capacity; + int capacityFromMap; + for (int i = 0; i < headers.length; i++) { + if (headers[i] != null) { + header = headers[i]; + for (int j = 0; j < frameCapacity; j = j + 2) { + if (header.getInt(j) >= 0) { + headerSlotUsedCount++; + getSlotInfo(header.getInt(j), header.getInt(j + 1), slotInfo); + capacity = slotInfo.first; + tupleUsedCount = slotInfo.second; + // UsedCount increase + if (headerSlotUsedCountMap.containsKey(tupleUsedCount)) { + tupleUsedCountFromMap = headerSlotUsedCountMap.get(tupleUsedCount); + headerSlotUsedCountMap.put(tupleUsedCount, tupleUsedCountFromMap + 1); + } else { + headerSlotUsedCountMap.put(tupleUsedCount, 1); + } + // Capacity increase + if (headerSlotCapaCountMap.containsKey(capacity)) { + capacityFromMap = headerSlotCapaCountMap.get(capacity); + headerSlotCapaCountMap.put(capacity, capacityFromMap + 1); + } else { + headerSlotCapaCountMap.put(capacity, 1); + } + headerSlotUsedCount++; + } + } + hFrames++; + } + } + headerSlotTotalCount = hFrames * frameCapacity / 2; + if (headerSlotTotalCount > 0) { + headerSlotUsedRatio = (double) headerSlotUsedCount / (double) headerSlotTotalCount; } + int total = hFrames + nFrames; + StringBuilder buf = new StringBuilder(); + buf.append("\n>>> " + this + " " + Thread.currentThread().getId() + "::printInfo()" + "\n"); + buf.append("(A) hash table cardinality (# of slot):\t" + tableSize + "\tExpected Table Size(MB):\t" + + ((double) getExpectedTableByteSize(tableSize, frameCapacity * 4) / 1048576) + "\twasted size(MB):\t" + + ((double) wastedIntSpaceCount * 4 / 1048576) + "\n"); + buf.append("(B) # of header frames:\t" + hFrames + "\tsize(MB)\t" + + ((double) hFrames * frameCapacity * 4 / 1048576) + "\tratio (B/D)\t" + ((double) hFrames / total) + + "\n"); + buf.append("(C) # of content frames:\t" + nFrames + "\tsize(MB)\t" + + ((double) nFrames * frameCapacity * 4 / 1048576) + "\tratio (C/D)\t" + ((double) nFrames / total) + + "\n"); + buf.append("(D) # of total frames:\t" + total + "\tsize(MB)\t" + ((double) total * frameCapacity * 4 / 1048576) + + "\n"); + buf.append("(E) # of used header entries:\t" + headerSlotUsedCount + "\n"); + buf.append("(F) # of all possible header entries:\t" + headerSlotTotalCount + "\n"); + buf.append("(G) header entries used ratio (E/F):\t" + headerSlotUsedRatio + "\n"); + buf.append("(H) used count histogram (used count, its frequency):" + "\n"); + int totalContentUsedCount = 0; + for (Map.Entry entry : headerSlotUsedCountMap.entrySet()) { + buf.append(entry.getKey() + "\t" + entry.getValue() + "\n"); + totalContentUsedCount += (entry.getKey() * entry.getValue()); + } + buf.append("(H-1) total used count in content frames:\t" + totalContentUsedCount + "\n"); - public int getInt(int pos) { - int offset = pos * 4; - return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) - + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0); + int totalContentCapaCount = 0; + buf.append("(I) capacity count histogram (capacity, its frequency):" + "\n"); + for (Map.Entry entry : headerSlotCapaCountMap.entrySet()) { + buf.append(entry.getKey() + "\t" + entry.getValue() + "\n"); + totalContentCapaCount += (entry.getKey() * entry.getValue()); } + buf.append("(I-1) total capacity in content frames:\t" + totalContentCapaCount + "\n"); + buf.append("(J) ratio of used count in content frames (H-1 / I-1):\t" + + ((double) totalContentUsedCount / totalContentCapaCount) + "\n"); + return buf.toString(); + } + + /** + * Returns the capacity and the usedCount for the given slot in this table. + */ + public void getSlotInfo(int contentFrameIndex, int contentOffsetIndex, SlotInfoPair slotInfo) { + IntSerDeBuffer frame = contents.get(contentFrameIndex); + int entryCapacity = frame.getInt(contentOffsetIndex); + int entryUsedItems = frame.getInt(contentOffsetIndex + 1); + slotInfo.reset(entryCapacity, entryUsedItems); + } + + private static class SlotInfoPair { + + private T1 first; + private T2 second; - public void writeInt(int pos, int value) { - int offset = pos * 4; - bytes[offset++] = (byte) (value >> 24); - bytes[offset++] = (byte) (value >> 16); - bytes[offset++] = (byte) (value >> 8); - bytes[offset++] = (byte) (value); + public SlotInfoPair(T1 first, T2 second) { + this.first = first; + this.second = second; } - public int capacity() { - return bytes.length / 4; + public void reset(T1 first, T2 second) { + this.first = first; + this.second = second; } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java new file mode 100644 index 0000000..5b7d364 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.structures; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hyracks.api.context.IHyracksFrameMgrContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor; + +/** + * This table consists of header frames and content frames. + * Header indicates the first entry slot location for the given integer value. + * A header slot consists of [content frame number], [offset in that frame] to get + * the first tuple's pointer information that shares the same hash value. + * An entry slot in the content frame is as follows. + * [capacity of the slot], [# of occupied elements], {[frameIndex], [tupleIndex]}+; + * forms a tuple pointer. + * WARNING: this hash table can grow up indefinitely and may generate Out Of Memory Exception. + * So, do not use this in production and use SerializableHashTable class instead + * since that should be managed by a buffer manager. + */ +public class SimpleSerializableHashTable implements ISerializableTable { + + // unit size: int + protected static final int INT_SIZE = 4; + // Initial entry slot size + protected static final int INIT_ENTRY_SIZE = 4; + protected static final int INVALID_VALUE = 0xFFFFFFFF; + protected static final byte INVALID_BYTE_VALUE = (byte) 0xFF; + + // Header frame array + protected IntSerDeBuffer[] headers; + // Content frame list + protected List contents = new ArrayList<>(); + protected List currentOffsetInEachFrameList = new ArrayList<>(); + protected int frameCapacity; + protected int currentLargestFrameNumber = 0; + // The byte size of total frames that are allocated to the headers and contents + protected int currentByteSize = 0; + protected int tupleCount = 0; + protected TuplePointer tempTuplePointer = new TuplePointer(); + protected int tableSize; + protected int frameSize; + protected IHyracksFrameMgrContext ctx; + + public SimpleSerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException { + this(tableSize, ctx, true); + } + + public SimpleSerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx, boolean frameInitRequired) + throws HyracksDataException { + this.ctx = ctx; + frameSize = ctx.getInitialFrameSize(); + int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1; + int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual; + headers = new IntSerDeBuffer[headerSize]; + this.tableSize = tableSize; + if (frameInitRequired) { + ByteBuffer newFrame = getFrame(frameSize); + if (newFrame == null) { + throw new HyracksDataException("Can't initialize the Hash Table. Please assign more memory."); + } + IntSerDeBuffer frame = new IntSerDeBuffer(newFrame); + frameCapacity = frame.capacity(); + contents.add(frame); + currentOffsetInEachFrameList.add(0); + } + } + + ByteBuffer getFrame(int size) throws HyracksDataException { + currentByteSize += size; + return ctx.allocateFrame(size); + } + + void increaseWastedSpaceCount(int size) { + // Do nothing. For this simple implementation, we don't count the wasted space. + } + + @Override + public boolean insert(int entry, TuplePointer pointer) throws HyracksDataException { + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer headerFrame = headers[headerFrameIndex]; + if (headerFrame == null) { + ByteBuffer newFrame = getFrame(frameSize); + if (newFrame == null) { + return false; + } + headerFrame = new IntSerDeBuffer(newFrame); + headers[headerFrameIndex] = headerFrame; + } + int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame); + boolean result; + if (contentFrameIndex < 0) { + // Since the initial value of index and offset is -1, this means that the slot for + // this entry is not created yet. So, create the entry slot and insert first tuple into that slot. + // OR, the previous slot becomes full and the newly double-sized slot is about to be created. + result = insertNewEntry(headerFrame, offsetInHeaderFrame, INIT_ENTRY_SIZE, pointer); + } else { + // The entry slot already exists. Insert non-first tuple into the entry slot + int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1); + result = insertNonFirstTuple(headerFrame, offsetInHeaderFrame, contentFrameIndex, offsetInContentFrame, + pointer); + } + + if (result) { + tupleCount++; + } + + return result; + } + + @Override + /** + * Reset the slot information for the entry. The connection (pointer) between header frame and + * content frame will be also lost. Specifically, we reset the number of used count in the slot as -1 + * so that the space could be reclaimed. + */ + public void delete(int entry) { + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer header = headers[headerFrameIndex]; + if (header != null) { + int contentFrameIndex = header.getInt(offsetInHeaderFrame); + int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1); + if (contentFrameIndex >= 0) { + IntSerDeBuffer frame = contents.get(contentFrameIndex); + int entrySlotCapacity = frame.getInt(offsetInContentFrame); + int entryUsedItems = frame.getInt(offsetInContentFrame + 1); + // Set used count as -1 in the slot so that the slot space could be reclaimed. + frame.writeInvalidVal(offsetInContentFrame + 1, 1); + // Also reset the header (frmaeIdx, offset) to content frame pointer. + header.writeInvalidVal(offsetInHeaderFrame, 2); + tupleCount = tupleCount - entryUsedItems; + increaseWastedSpaceCount((entrySlotCapacity + 1) * 2); + } + } + } + + @Override + /** + * For the given integer value, get the n-th (n = offsetInSlot) tuple pointer in the corresponding slot. + */ + public boolean getTuplePointer(int entry, int offsetInSlot, TuplePointer dataPointer) { + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer header = headers[headerFrameIndex]; + if (header == null) { + dataPointer.reset(INVALID_VALUE, INVALID_VALUE); + return false; + } + int contentFrameIndex = header.getInt(offsetInHeaderFrame); + int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1); + if (contentFrameIndex < 0) { + dataPointer.reset(INVALID_VALUE, INVALID_VALUE); + return false; + } + IntSerDeBuffer frame = contents.get(contentFrameIndex); + int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1); + if (offsetInSlot > entryUsedCountInSlot - 1) { + dataPointer.reset(INVALID_VALUE, INVALID_VALUE); + return false; + } + int startOffsetInContentFrame = offsetInContentFrame + 2 + offsetInSlot * 2; + while (startOffsetInContentFrame >= frameCapacity) { + ++contentFrameIndex; + startOffsetInContentFrame -= frameCapacity; + } + frame = contents.get(contentFrameIndex); + dataPointer.reset(frame.getInt(startOffsetInContentFrame), frame.getInt(startOffsetInContentFrame + 1)); + return true; + } + + @Override + public void reset() { + for (IntSerDeBuffer frame : headers) { + if (frame != null) { + frame.resetFrame(); + } + } + + currentOffsetInEachFrameList.clear(); + for (int i = 0; i < contents.size(); i++) { + currentOffsetInEachFrameList.add(0); + } + + currentLargestFrameNumber = 0; + tupleCount = 0; + currentByteSize = 0; + } + + @Override + public int getCurrentByteSize() { + return currentByteSize; + } + + @Override + public int getTupleCount() { + return tupleCount; + } + + @Override + /** + * Returns the tuple count in the slot for the given entry. + */ + public int getTupleCount(int entry) { + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer headerFrame = headers[headerFrameIndex]; + if (headerFrame != null) { + int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame); + int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1); + if (contentFrameIndex >= 0) { + IntSerDeBuffer frame = contents.get(contentFrameIndex); + int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1); + return entryUsedCountInSlot; + } + } + return 0; + } + + @Override + public void close() { + int nFrames = contents.size(); + for (int i = 0; i < headers.length; i++) { + headers[i] = null; + } + contents.clear(); + currentOffsetInEachFrameList.clear(); + tupleCount = 0; + currentByteSize = 0; + currentLargestFrameNumber = 0; + ctx.deallocateFrames(nFrames); + } + + protected boolean insertNewEntry(IntSerDeBuffer header, int offsetInHeaderFrame, int entryCapacity, + TuplePointer pointer) throws HyracksDataException { + IntSerDeBuffer lastContentFrame = contents.get(currentLargestFrameNumber); + int lastOffsetInCurrentFrame = currentOffsetInEachFrameList.get(currentLargestFrameNumber); + int requiredIntCapacity = entryCapacity * 2; + int currentFrameNumber = currentLargestFrameNumber; + boolean currentFrameNumberChanged = false; + + if (lastOffsetInCurrentFrame + requiredIntCapacity >= frameCapacity) { + IntSerDeBuffer newContentFrame; + // At least we need to have the mata-data (slot capacity and used count) and + // one tuplePointer in the same frame (4 INT_SIZE). + // So, if there is not enough space for this, we just move on to the next page. + if ((lastOffsetInCurrentFrame + 4) > frameCapacity) { + // Swipe the region that can't be used. + lastContentFrame.writeInvalidVal(lastOffsetInCurrentFrame, frameCapacity - lastOffsetInCurrentFrame); + currentFrameNumber++; + lastOffsetInCurrentFrame = 0; + currentFrameNumberChanged = true; + } + do { + if (currentLargestFrameNumber >= contents.size() - 1) { + ByteBuffer newFrame = getFrame(frameSize); + if (newFrame == null) { + return false; + } + newContentFrame = new IntSerDeBuffer(newFrame); + currentLargestFrameNumber++; + contents.add(newContentFrame); + currentOffsetInEachFrameList.add(0); + } else { + currentLargestFrameNumber++; + currentOffsetInEachFrameList.set(currentLargestFrameNumber, 0); + } + requiredIntCapacity -= frameCapacity; + } while (requiredIntCapacity > 0); + } + + if (currentFrameNumberChanged) { + lastContentFrame = contents.get(currentFrameNumber); + } + + // sets the header + header.writeInt(offsetInHeaderFrame, currentFrameNumber); + header.writeInt(offsetInHeaderFrame + 1, lastOffsetInCurrentFrame); + + // sets the entry & its slot. + // 1. slot capacity + lastContentFrame.writeInt(lastOffsetInCurrentFrame, entryCapacity - 1); + // 2. used count in the slot + lastContentFrame.writeInt(lastOffsetInCurrentFrame + 1, 1); + // 3. initial entry in the slot + lastContentFrame.writeInt(lastOffsetInCurrentFrame + 2, pointer.getFrameIndex()); + lastContentFrame.writeInt(lastOffsetInCurrentFrame + 3, pointer.getTupleIndex()); + int newLastOffsetInContentFrame = lastOffsetInCurrentFrame + entryCapacity * 2; + newLastOffsetInContentFrame = newLastOffsetInContentFrame < frameCapacity ? newLastOffsetInContentFrame + : frameCapacity - 1; + currentOffsetInEachFrameList.set(currentFrameNumber, newLastOffsetInContentFrame); + + requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastOffsetInCurrentFrame); + while (requiredIntCapacity > 0) { + currentFrameNumber++; + requiredIntCapacity -= frameCapacity; + newLastOffsetInContentFrame = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity + : frameCapacity - 1; + currentOffsetInEachFrameList.set(currentFrameNumber, newLastOffsetInContentFrame); + } + + return true; + } + + protected boolean insertNonFirstTuple(IntSerDeBuffer header, int offsetInHeaderFrame, int contentFrameIndex, + int offsetInContentFrame, TuplePointer pointer) throws HyracksDataException { + int frameIndex = contentFrameIndex; + IntSerDeBuffer contentFrame = contents.get(frameIndex); + int entrySlotCapacity = contentFrame.getInt(offsetInContentFrame); + int entryUsedCountInSlot = contentFrame.getInt(offsetInContentFrame + 1); + boolean frameIndexChanged = false; + if (entryUsedCountInSlot < entrySlotCapacity) { + // The slot has at least one space to accommodate this tuple pointer. + // Increase the used count by 1. + contentFrame.writeInt(offsetInContentFrame + 1, entryUsedCountInSlot + 1); + // Calculates the first empty spot in the slot. + // +2: (capacity, # of used entry count) + // *2: each tuplePointer's occupation (frame index + offset in that frame) + int startOffsetInContentFrame = offsetInContentFrame + 2 + entryUsedCountInSlot * 2; + while (startOffsetInContentFrame >= frameCapacity) { + ++frameIndex; + startOffsetInContentFrame -= frameCapacity; + frameIndexChanged = true; + } + // We don't have to read content frame again if the frame index has not been changed. + if (frameIndexChanged) { + contentFrame = contents.get(frameIndex); + } + contentFrame.writeInt(startOffsetInContentFrame, pointer.getFrameIndex()); + contentFrame.writeInt(startOffsetInContentFrame + 1, pointer.getTupleIndex()); + } else { + // There is no enough space in this slot. We need to increase the slot size and + // migrate the current entries in it. + + // New capacity: double the original capacity + int capacity = (entrySlotCapacity + 1) * 2; + + // Temporarily sets the header (frameIdx, offset) as (-1,-1) for the slot. + header.writeInvalidVal(offsetInHeaderFrame, 2); + // Marks the old slot as obsolete - set the used count as -1 so that its space can be reclaimed + // when a garbage collection is executed. + contentFrame.writeInvalidVal(offsetInContentFrame + 1, 1); + + // Gets the location of the initial entry. + int fIndex = contentFrame.getInt(offsetInContentFrame + 2); + int tIndex = contentFrame.getInt(offsetInContentFrame + 3); + tempTuplePointer.reset(fIndex, tIndex); + // Creates a new double-sized slot for the current entries and + // migrates the initial entry in the slot to the new slot. + if (!this.insertNewEntry(header, offsetInHeaderFrame, capacity, tempTuplePointer)) { + // Reverses the effect of change. + header.writeInt(offsetInHeaderFrame, contentFrameIndex); + header.writeInt(offsetInHeaderFrame + 1, offsetInContentFrame); + contentFrame.writeInt(offsetInContentFrame + 1, entryUsedCountInSlot); + return false; + } + + int newFrameIndex = header.getInt(offsetInHeaderFrame); + int newTupleIndex = header.getInt(offsetInHeaderFrame + 1); + + // Migrates the existing entries (from 2nd to the last). + for (int i = 1; i < entryUsedCountInSlot; i++) { + int startOffsetInContentFrame = offsetInContentFrame + 2 + i * 2; + int startFrameIndex = frameIndex; + while (startOffsetInContentFrame >= frameCapacity) { + ++startFrameIndex; + startOffsetInContentFrame -= frameCapacity; + } + contentFrame = contents.get(startFrameIndex); + fIndex = contentFrame.getInt(startOffsetInContentFrame); + tIndex = contentFrame.getInt(startOffsetInContentFrame + 1); + tempTuplePointer.reset(fIndex, tIndex); + if (!insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, tempTuplePointer)) { + return false; + } + } + // Now, inserts the new entry that caused an overflow to the old bucket. + if (!insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, pointer)) { + return false; + } + increaseWastedSpaceCount(capacity); + } + return true; + } + + protected int getHeaderFrameIndex(int entry) { + int frameIndex = entry * 2 / frameCapacity; + return frameIndex; + } + + protected int getHeaderFrameOffset(int entry) { + int offset = entry * 2 % frameCapacity; + return offset; + } + + public static int getUnitSize() { + return INT_SIZE; + } + + public static int getNumberOfEntryInSlot() { + return INIT_ENTRY_SIZE; + } + + public static int getExpectedByteSizePerHashValue() { + // first constant 2: capacity, # of used count + // second constant 2: tuple pointer (frameIndex, offset) + return getUnitSize() * (2 + getNumberOfEntryInSlot() * 2); + } + + /** + * Calculates the expected hash table size based on a scenario: there are no duplicated entries so that + * each entry is assigned to all possible slots. + * + * @param tableSize + * : the cardinality of the hash table - number of slots + * @param frameSize + * : the frame size + * @return + * expected the byte size of the hash table + */ + public static long getExpectedTableFrameCount(long tableSize, int frameSize) { + long numberOfHeaderFrame = (long) (Math.ceil((double) tableSize * 2 / (double) frameSize)); + long numberOfContentFrame = (long) (Math + .ceil(((double) getNumberOfEntryInSlot() * 2 * getUnitSize() * tableSize) / (double) frameSize)); + return numberOfHeaderFrame + numberOfContentFrame; + } + + public static long getExpectedTableByteSize(long tableSize, int frameSize) { + return getExpectedTableFrameCount(tableSize, frameSize) * frameSize; + } + + /** + * Calculates the frame count increment/decrement for a new table size with the original size. + * + * @param origTableSize + * : the original table cardinality + * @param delta + * : a delta (a positive value means that the cardinality of the table will be increased.) + * a negative value means that the cardinality of the table will be decreased. + * @return the frame count increment/decrement: a positive number means that the table size will be increased. + * a negative number means that the table size will be decreased. + */ + public static long calculateFrameCountDeltaForTableSizeChange(long origTableSize, long delta, int frameSize) { + long originalFrameCount = getExpectedTableFrameCount(origTableSize, frameSize); + long newFrameCount = getExpectedTableFrameCount(origTableSize + delta, frameSize); + return newFrameCount - originalFrameCount; + } + + public static long calculateByteSizeDeltaForTableSizeChange(long origTableSize, long delta, int frameSize) { + return calculateFrameCountDeltaForTableSizeChange(origTableSize, delta, frameSize) * frameSize; + } + + @Override + public boolean isGarbageCollectionNeeded() { + // This class doesn't support the garbage collection. + return false; + } + + @Override + public int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc) + throws HyracksDataException { + // This class doesn't support the garbage collection. + return -1; + } + + static class IntSerDeBuffer { + + ByteBuffer byteBuffer; + byte[] bytes; + + public IntSerDeBuffer(ByteBuffer byteBuffer) { + this.byteBuffer = byteBuffer; + this.bytes = byteBuffer.array(); + resetFrame(); + } + + public int getInt(int pos) { + int offset = pos * 4; + return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + + ((bytes[offset + 2] & 0xff) << 8) + (bytes[offset + 3] & 0xff); + } + + public void writeInt(int pos, int value) { + int offset = pos * 4; + bytes[offset++] = (byte) (value >> 24); + bytes[offset++] = (byte) (value >> 16); + bytes[offset++] = (byte) (value >> 8); + bytes[offset] = (byte) (value); + } + + public void writeInvalidVal(int intPos, int intRange) { + int offset = intPos * 4; + Arrays.fill(bytes, offset, offset + INT_SIZE * intRange, INVALID_BYTE_VALUE); + } + + public int capacity() { + return bytes.length / 4; + } + + public int getByteCapacity() { + return bytes.length; + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; + } + + public void resetFrame() { + Arrays.fill(bytes, INVALID_BYTE_VALUE); + } + + } + + @Override + public String printInfo() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java index 43a1f6d..eeec223 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java @@ -23,8 +23,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.hyracks.api.context.IHyracksFrameMgrContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.resources.memory.FrameManager; +import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool; +import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager; +import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool; +import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager; import org.junit.Before; import org.junit.Test; @@ -33,11 +38,17 @@ public class SerializableHashTableTest { SerializableHashTable nsTable; final int NUM_PART = 101; TuplePointer pointer = new TuplePointer(0, 0); - final int num = 1000; + final int num = 10000; + protected IHyracksFrameMgrContext ctx; + private IDeallocatableFramePool framePool; + private ISimpleFrameBufferManager bufferManager; @Before public void setup() throws HyracksDataException { - nsTable = new SerializableHashTable(NUM_PART, new FrameManager(256)); + ctx = new FrameManager(256); + framePool = new DeallocatableFramePool(ctx, ctx.getInitialFrameSize() * 2048); + bufferManager = new FramePoolBackedFrameBufferManager(framePool); + nsTable = new SerializableHashTable(NUM_PART, ctx, bufferManager); } @Test @@ -66,7 +77,7 @@ public class SerializableHashTableTest { assertGetValue(); } - private void assertGetValue() { + protected void assertGetValue() { int loop = 0; for (int i = 0; i < num; i++) { assertTrue(nsTable.getTuplePointer(i % NUM_PART, loop, pointer)); @@ -75,8 +86,9 @@ public class SerializableHashTableTest { loop++; } } + int tupleCntPerPart = (int) Math.ceil((double) num / NUM_PART); for (int i = 0; i < NUM_PART; i++) { - assertTrue(nsTable.getTupleCount(i) == 10 || nsTable.getTupleCount(i) == 9); + assertTrue(nsTable.getTupleCount(i) == tupleCntPerPart || nsTable.getTupleCount(i) == tupleCntPerPart - 1); } } @@ -86,7 +98,7 @@ public class SerializableHashTableTest { assertAllPartitionsCountIsZero(); } - private void assertAllPartitionsCountIsZero() { + protected void assertAllPartitionsCountIsZero() { for (int i = 0; i < NUM_PART; i++) { assertEquals(0, nsTable.getTupleCount(i)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java new file mode 100644 index 0000000..027ee27 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.structures; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hyracks.api.context.IHyracksFrameMgrContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.resources.memory.FrameManager; +import org.junit.Before; +import org.junit.Test; + +public class SimpleSerializableHashTableTest { + + SimpleSerializableHashTable nsTable; + final int NUM_PART = 101; + TuplePointer pointer = new TuplePointer(0, 0); + final int num = 10000; + private IHyracksFrameMgrContext ctx; + + @Before + public void setup() throws HyracksDataException { + ctx = new FrameManager(256); + nsTable = new SimpleSerializableHashTable(NUM_PART, ctx); + } + + @Test + public void testBatchDeletePartition() throws Exception { + testInsert(); + for (int i = 0; i < NUM_PART; i++) { + nsTable.delete(i); + assertFalse(nsTable.getTuplePointer(i, 0, pointer)); + assertEquals(0, nsTable.getTupleCount(i)); + + for (int j = i; j < num; j += NUM_PART) { + pointer.reset(j, j); + nsTable.insert(i, pointer); + } + + assertGetValue(); + } + } + + @Test + public void testInsert() throws Exception { + for (int i = 0; i < num; i++) { + pointer.reset(i, i); + nsTable.insert(i % NUM_PART, pointer); + } + assertGetValue(); + } + + private void assertGetValue() { + int loop = 0; + for (int i = 0; i < num; i++) { + assertTrue(nsTable.getTuplePointer(i % NUM_PART, loop, pointer)); + assertTrue(pointer.getFrameIndex() == i); + if (i % NUM_PART == NUM_PART - 1) { + loop++; + } + } + int tupleCntPerPart = (int) Math.ceil((double) num / NUM_PART); + for (int i = 0; i < NUM_PART; i++) { + assertTrue(nsTable.getTupleCount(i) == tupleCntPerPart || nsTable.getTupleCount(i) == tupleCntPerPart - 1); + } + + } + + @Test + public void testGetCount() throws Exception { + assertAllPartitionsCountIsZero(); + } + + private void assertAllPartitionsCountIsZero() { + for (int i = 0; i < NUM_PART; i++) { + assertEquals(0, nsTable.getTupleCount(i)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java index f20461c..f169054 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java @@ -160,7 +160,7 @@ public class AggregationTest extends AbstractIntegrationTest { IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE }); int[] keyFields = new int[] { 0 }; - int frameLimits = 4; + int frameLimits = 5; int tableSize = 8; long fileSize = frameLimits * spec.getFrameSize(); @@ -249,7 +249,7 @@ public class AggregationTest extends AbstractIntegrationTest { IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE }); int[] keyFields = new int[] { 0 }; - int frameLimits = 4; + int frameLimits = 5; int tableSize = 8; long fileSize = frameLimits * spec.getFrameSize(); @@ -338,7 +338,7 @@ public class AggregationTest extends AbstractIntegrationTest { IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }); int[] keyFields = new int[] { 0 }; - int frameLimits = 4; + int frameLimits = 5; int tableSize = 8; long fileSize = frameLimits * spec.getFrameSize(); @@ -429,7 +429,7 @@ public class AggregationTest extends AbstractIntegrationTest { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }); int[] keyFields = new int[] { 8, 0 }; - int frameLimits = 4; + int frameLimits = 5; int tableSize = 8; long fileSize = frameLimits * spec.getFrameSize(); @@ -524,7 +524,7 @@ public class AggregationTest extends AbstractIntegrationTest { IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE }); int[] keyFields = new int[] { 8, 0 }; - int frameLimits = 4; + int frameLimits = 5; int tableSize = 8; long fileSize = frameLimits * spec.getFrameSize(); @@ -619,7 +619,7 @@ public class AggregationTest extends AbstractIntegrationTest { IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }); int[] keyFields = new int[] { 8, 0 }; - int frameLimits = 4; + int frameLimits = 5; int tableSize = 8; long fileSize = frameLimits * spec.getFrameSize(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java index f8c7487..7075fe9 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java @@ -125,7 +125,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, 128, null); + custOrderJoinDesc, 128, null, 128); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -293,7 +293,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - null, custOrderJoinDesc, true, nonMatchWriterFactories, 128); + null, custOrderJoinDesc, true, nonMatchWriterFactories, 128, 128); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -464,7 +464,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, 128, null); + custOrderJoinDesc, 128, null, 128); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID); ResultSetId rsId = new ResultSetId(1); @@ -552,7 +552,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2, + HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 100, 1.2, new int[] { 1 }, new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, @@ -647,7 +647,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, 128, null); + custOrderJoinDesc, 128, null, 128); PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2); ResultSetId rsId = new ResultSetId(1); @@ -743,7 +743,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, 128, null); + custOrderJoinDesc, 128, null, 128); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID); ResultSetId rsId = new ResultSetId(1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java index b8ec790..f83ab6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java @@ -28,9 +28,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import org.junit.Assert; -import org.junit.Test; - import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -56,6 +53,8 @@ import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFac import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory; import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory; import org.apache.hyracks.test.support.TestUtils; +import org.junit.Assert; +import org.junit.Test; public abstract class AbstractExternalGroupbyTest { @@ -176,8 +175,8 @@ public abstract class AbstractExternalGroupbyTest { @Test public void testBuildAndMergeNormalFrameInMem() throws HyracksDataException { - int tableSize = 1001; - int numFrames = 3; + int tableSize = 101; + int numFrames = 23; int frameSize = 256; int minDataSize = frameSize; int minRecordSize = 20; @@ -187,10 +186,10 @@ public abstract class AbstractExternalGroupbyTest { @Test public void testBuildAndMergeNormalFrameSpill() throws HyracksDataException { - int tableSize = 1001; - int numFrames = 3; + int tableSize = 101; + int numFrames = 23; int frameSize = 256; - int minDataSize = frameSize * 4; + int minDataSize = frameSize * 40; int minRecordSize = 20; int maxRecordSize = 50; testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null); @@ -198,16 +197,14 @@ public abstract class AbstractExternalGroupbyTest { @Test public void testBuildAndMergeBigObj() throws HyracksDataException { - int tableSize = 1001; - int numFrames = 4; + int tableSize = 101; + int numFrames = 23; int frameSize = 256; - int minDataSize = frameSize * 5; + int minDataSize = frameSize * 40; int minRecordSize = 20; int maxRecordSize = 50; - HashMap bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 2); - testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, - bigRecords); - + HashMap bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 3); + testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, bigRecords); } protected abstract void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java index 65073e0..c3d0df1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java @@ -148,7 +148,7 @@ public class Join { private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits, FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame, double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize) - throws HyracksDataException { + throws HyracksDataException { JobSpecification spec = new JobSpecification(frameSize); IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits); @@ -183,7 +183,7 @@ public class Join { new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - Common.custOrderJoinDesc, tableSize, null); + Common.custOrderJoinDesc, tableSize, null, memSize * frameSize); } else if ("hybrid".equalsIgnoreCase(algo)) { join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,