Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 91464184F3 for ; Fri, 26 Feb 2016 05:54:15 +0000 (UTC) Received: (qmail 38950 invoked by uid 500); 26 Feb 2016 05:54:15 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 38906 invoked by uid 500); 26 Feb 2016 05:54:15 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 38894 invoked by uid 99); 26 Feb 2016 05:54:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Feb 2016 05:54:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D6205180558 for ; Fri, 26 Feb 2016 05:54:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 5Q_gSFj-E3OT for ; Fri, 26 Feb 2016 05:54:01 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 8B6A95F1E3 for ; Fri, 26 Feb 2016 05:53:59 +0000 (UTC) Received: (qmail 38390 invoked by uid 99); 26 Feb 2016 05:53:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Feb 2016 05:53:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9CD57DFBA3; Fri, 26 Feb 2016 05:53:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianfeng@apache.org To: commits@asterixdb.incubator.apache.org Date: Fri, 26 Feb 2016 05:54:02 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java deleted file mode 100644 index 1cf454d..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public interface IFrameBufferManager { - - /** - * Reset the counters and flags to initial status. This method should not release the pre-allocated resources - * - * @throws org.apache.hyracks.api.exceptions.HyracksDataException - */ - void reset() throws HyracksDataException; - - /** - * @param frameIndex - * @return the specified frame, from the set of memory buffers, being - * managed by this memory manager - */ - ByteBuffer getFrame(int frameIndex); - - /** - * Get the startOffset of the specific frame inside buffer - * - * @param frameIndex - * @return the start offset of the frame returned by {@link #getFrame(int)} method. - */ - int getFrameStartOffset(int frameIndex); - - /** - * Get the size of the specific frame inside buffer - * - * @param frameIndex - * @return the length of the specific frame - */ - int getFrameSize(int frameIndex); - - /** - * @return the number of frames in this buffer - */ - int getNumFrames(); - - /** - * Writes the whole frame into the buffer. - * - * @param frame source frame - * @return the id of the inserted frame. if failed to return it will be -1. - */ - int insertFrame(ByteBuffer frame) throws HyracksDataException; - - void close(); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java deleted file mode 100644 index fab1706..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -public interface IFrameFreeSlotPolicy { - - /** - * Find the best fit frame id which can hold the data, and then pop it out from the index. - * Return -1 is failed to find any. - * - * @param tobeInsertedSize the actual size of the data which should include - * the meta data like the field offset and the tuple - * count extra size - * @return the best fit frame id - */ - int popBestFit(int tobeInsertedSize); - - /** - * Register the new free slot into the index - * - * @param frameID - * @param freeSpace - */ - void pushNewFrame(int frameID, int freeSpace); - - /** - * Clear all the existing free slot information. - */ - void reset(); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java deleted file mode 100644 index f555971..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public interface IFramePool { - - int getMinFrameSize(); - - int getMemoryBudgetBytes(); - - /** - * Get a frame of given size.
- * Returns {@code null} if failed to allocate the required size of frame - * - * @param frameSize the actual size of the frame. - * @return the allocated frame - * @throws HyracksDataException - */ - ByteBuffer allocateFrame(int frameSize) throws HyracksDataException; - - /** - * Reset the counters to initial status. This method should not release the pre-allocated resources. - */ - void reset(); - - /** - * Release the pre-allocated resources. - */ - void close(); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java deleted file mode 100644 index 00decb9..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.dataflow.std.structures.TuplePointer; - -public interface ITupleBufferAccessor { - - void reset(TuplePointer tuplePointer); - - ByteBuffer getTupleBuffer(); - - int getTupleStartOffset(); - - int getTupleLength(); - - int getAbsFieldStartOffset(int fieldId); - - int getFieldLength(int fieldId); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java deleted file mode 100644 index ae502a0..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.structures.TuplePointer; - -public interface ITupleBufferManager { - /** - * Reset the counters and flags to initial status. This method should not release the pre-allocated resources - * - * @throws org.apache.hyracks.api.exceptions.HyracksDataException - */ - void reset() throws HyracksDataException; - - /** - * @return the number of tuples in this buffer - */ - int getNumTuples(); - - boolean insertTuple(IFrameTupleAccessor accessor, int idx, TuplePointer tuplePointer) throws HyracksDataException; - - void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException; - - void close(); - - ITupleBufferAccessor getTupleAccessor(); -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java deleted file mode 100644 index 444b0b6..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hyracks.api.comm.FixedSizeFrame; -import org.apache.hyracks.api.comm.FrameHelper; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; - -public class VariableFrameMemoryManager implements IFrameBufferManager { - - private class PhysicalFrameOffset { - IFrame physicalFrame; - int physicalOffset; - - PhysicalFrameOffset(IFrame frame, int offset) { - physicalFrame = frame; - physicalOffset = offset; - } - } - - private class LogicalFrameStartSize { - ByteBuffer logicalFrame; - int logicalStart; - int logicalSize; - - LogicalFrameStartSize(ByteBuffer frame, int start, int size) { - logicalFrame = frame; - logicalStart = start; - logicalSize = size; - } - } - - private final IFramePool framePool; - private List physicalFrameOffsets; - private List logicalFrameStartSizes; - private final IFrameFreeSlotPolicy freeSlotPolicy; - - public VariableFrameMemoryManager(IFramePool framePool, IFrameFreeSlotPolicy freeSlotPolicy) { - this.framePool = framePool; - this.freeSlotPolicy = freeSlotPolicy; - int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize(); - this.physicalFrameOffsets = new ArrayList<>(maxFrames); - this.logicalFrameStartSizes = new ArrayList<>(maxFrames); - } - - private int findAvailableFrame(int frameSize) throws HyracksDataException { - int frameId = freeSlotPolicy.popBestFit(frameSize); - if (frameId >= 0) { - return frameId; - } - ByteBuffer buffer = framePool.allocateFrame(frameSize); - if (buffer != null) { - IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), 0); - physicalFrameOffsets.add(new PhysicalFrameOffset(new FixedSizeFrame(buffer), 0)); - return physicalFrameOffsets.size() - 1; - } - return -1; - } - - @Override - public void reset() throws HyracksDataException { - physicalFrameOffsets.clear(); - logicalFrameStartSizes.clear(); - freeSlotPolicy.reset(); - framePool.reset(); - } - - @Override - public ByteBuffer getFrame(int frameIndex) { - return logicalFrameStartSizes.get(frameIndex).logicalFrame; - } - - @Override - public int getFrameStartOffset(int frameIndex) { - return logicalFrameStartSizes.get(frameIndex).logicalStart; - } - - @Override - public int getFrameSize(int frameIndex) { - return logicalFrameStartSizes.get(frameIndex).logicalSize; - } - - @Override - public int getNumFrames() { - return logicalFrameStartSizes.size(); - } - - @Override - public int insertFrame(ByteBuffer frame) throws HyracksDataException { - int frameSize = frame.capacity(); - int physicalFrameId = findAvailableFrame(frameSize); - if (physicalFrameId < 0) { - return -1; - } - ByteBuffer buffer = physicalFrameOffsets.get(physicalFrameId).physicalFrame.getBuffer(); - int offset = physicalFrameOffsets.get(physicalFrameId).physicalOffset; - System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize); - if (offset + frameSize < buffer.capacity()) { - freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - offset - frameSize); - } - physicalFrameOffsets.get(physicalFrameId).physicalOffset = offset + frameSize; - logicalFrameStartSizes.add(new LogicalFrameStartSize(buffer, offset, frameSize)); - return logicalFrameStartSizes.size() - 1; - } - - @Override - public void close() { - physicalFrameOffsets.clear(); - logicalFrameStartSizes.clear(); - freeSlotPolicy.reset(); - framePool.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java deleted file mode 100644 index 344f961..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -import org.apache.hyracks.api.context.IHyracksFrameMgrContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class VariableFramePool implements IFramePool { - public static final int UNLIMITED_MEMORY = -1; - - private final IHyracksFrameMgrContext ctx; - private final int minFrameSize; - private final int memBudget; - - private int allocateMem; - private ArrayList buffers; // the unused slots were sorted by size increasingly. - private BitSet used; // the merged one also marked as used. - - /** - * The constructor of the VariableFramePool. - * - * @param ctx - * @param memBudgetInBytes the given memory budgets to allocate the frames. If it less than 0, it will be treated as unlimited budgets - */ - public VariableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) { - this.ctx = ctx; - this.minFrameSize = ctx.getInitialFrameSize(); - this.allocateMem = 0; - if (memBudgetInBytes == UNLIMITED_MEMORY) { - this.memBudget = Integer.MAX_VALUE; - this.buffers = new ArrayList<>(); - this.used = new BitSet(); - } else { - this.memBudget = memBudgetInBytes; - this.buffers = new ArrayList<>(memBudgetInBytes / minFrameSize); - this.used = new BitSet(memBudgetInBytes / minFrameSize); - } - } - - @Override - public int getMinFrameSize() { - return minFrameSize; - } - - @Override - public int getMemoryBudgetBytes() { - return memBudget; - } - - @Override - public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException { - int frameId = findExistingFrame(frameSize); - if (frameId >= 0) { - return reuseFrame(frameId); - } - if (haveEnoughFreeSpace(frameSize)) { - return createNewFrame(frameSize); - } - return mergeExistingFrames(frameSize); - - } - - private boolean haveEnoughFreeSpace(int frameSize) { - return frameSize + allocateMem <= memBudget; - } - - private static int getFirstUnUsedPos(BitSet used) { - return used.nextClearBit(0); - } - - private static int getLastUnUsedPos(BitSet used, int lastPos) { - return used.previousClearBit(lastPos); - } - - private static int binarySearchUnusedBuffer(ArrayList buffers, BitSet used, int frameSize) { - int l = getFirstUnUsedPos(used); // to skip the merged null buffers - int h = getLastUnUsedPos(used, (buffers.size() - 1)) + 1; // to skip the newly created buffers - if (l >= h) { - return -1; - } - int highest = h; - int mid = (l + h) / 2; - while (l < h) { - ByteBuffer buffer = buffers.get(mid); - if (buffer.capacity() == frameSize) { - break; - } - if (buffer.capacity() < frameSize) { - l = mid + 1; - } else { - h = mid; - } - mid = (l + h) / 2; - } - mid = used.nextClearBit(mid); - return mid < highest ? mid : -1; - } - - private int findExistingFrame(int frameSize) { - return binarySearchUnusedBuffer(buffers, used, frameSize); - } - - private ByteBuffer reuseFrame(int id) { - used.set(id); - buffers.get(id).clear(); - return buffers.get(id); - } - - private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException { - buffers.add(ctx.allocateFrame(frameSize)); - allocateMem += frameSize; - return reuseFrame(buffers.size() - 1); - } - - /** - * The merging sequence is from the smallest to the largest order. - * Once the buffer get merged, it will be remove from the list in order to free the object. - * And the index spot of it will be marked as used. - * - * @param frameSize - * @return - * @throws HyracksDataException - */ - private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException { - int mergedSize = memBudget - allocateMem; - int highBound = getLastUnUsedPos(used, buffers.size() - 1) + 1; - for (int i = getFirstUnUsedPos(used); i < highBound; ++i) { - if (!used.get(i)) { - mergedSize += deAllocateFrame(i); - if (mergedSize >= frameSize) { - return createNewFrame(mergedSize); - } - } - } - return null; - } - - private int deAllocateFrame(int id) { - ByteBuffer frame = buffers.get(id); - ctx.deallocateFrames(frame.capacity()); - buffers.set(id, null); - used.set(id); - allocateMem -= frame.capacity(); - return frame.capacity(); - } - - @Override - public void reset() { - removeEmptySpot(buffers); - Collections.sort(buffers, sizeByteBufferComparator); - used.clear(); - } - - private static void removeEmptySpot(List buffers) { - for (int i = 0; i < buffers.size(); ) { - if (buffers.get(i) == null) { - buffers.remove(i); - } else { - i++; - } - } - } - - @Override - public void close() { - buffers.clear(); - used.clear(); - allocateMem = 0; - } - - private static Comparator sizeByteBufferComparator = new Comparator() { - @Override - public int compare(ByteBuffer o1, ByteBuffer o2) { - if (o1.capacity() == o2.capacity()) { - return 0; - } - return o1.capacity() < o2.capacity() ? -1 : 1; - } - }; -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java deleted file mode 100644 index 20642bf..0000000 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.dataflow.std.sort.buffermanager; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender; -import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor; -import org.apache.hyracks.dataflow.std.structures.TuplePointer; - -public class VariableTupleMemoryManager implements ITupleBufferManager { - - private final static Logger LOG = Logger.getLogger(VariableTupleMemoryManager.class.getName()); - - private final int MIN_FREE_SPACE; - private final IFramePool pool; - private final IFrameFreeSlotPolicy policy; - private final IAppendDeletableFrameTupleAccessor accessor; - private final ArrayList frames; - private final RecordDescriptor recordDescriptor; - private int numTuples; - private int statsReOrg; - - public VariableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) { - this.pool = framePool; - int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize(); - this.policy = new FrameFreeSlotLastFit(maxFrames); - this.accessor = new DeletableFrameTupleAppender(recordDescriptor); - this.frames = new ArrayList<>(); - this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor); - this.recordDescriptor = recordDescriptor; - this.numTuples = 0; - this.statsReOrg = 0; - } - - @Override - public void reset() throws HyracksDataException { - pool.reset(); - policy.reset(); - frames.clear(); - numTuples = 0; - } - - @Override - public int getNumTuples() { - return numTuples; - } - - @Override - public boolean insertTuple(IFrameTupleAccessor fta, int idx, TuplePointer tuplePointer) - throws HyracksDataException { - int requiredFreeSpace = calculatePhysicalSpace(fta, idx); - int frameId = findAvailableFrame(requiredFreeSpace); - if (frameId < 0) { - if (canBeInsertedAfterCleanUpFragmentation(requiredFreeSpace)) { - reOrganizeFrames(); - frameId = findAvailableFrame(requiredFreeSpace); - statsReOrg++; - } else { - return false; - } - } - assert frameId >= 0; - accessor.reset(frames.get(frameId)); - assert accessor.getContiguousFreeSpace() >= requiredFreeSpace; - int tid = accessor.append(fta, idx); - assert tid >= 0; - tuplePointer.reset(frameId, tid); - if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) { - policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace()); - } - numTuples++; - return true; - } - - private void reOrganizeFrames() { - policy.reset(); - for (int i = 0; i < frames.size(); i++) { - accessor.reset(frames.get(i)); - accessor.reOrganizeBuffer(); - policy.pushNewFrame(i, accessor.getContiguousFreeSpace()); - } - } - - private boolean canBeInsertedAfterCleanUpFragmentation(int requiredFreeSpace) { - for (int i = 0; i < frames.size(); i++) { - accessor.reset(frames.get(i)); - if (accessor.getTotalFreeSpace() >= requiredFreeSpace) { - return true; - } - } - return false; - } - - private int findAvailableFrame(int requiredFreeSpace) throws HyracksDataException { - int frameId = policy.popBestFit(requiredFreeSpace); - if (frameId >= 0) { - return frameId; - } - - int frameSize = calculateMinFrameSizeToPlaceTuple(requiredFreeSpace, pool.getMinFrameSize()); - ByteBuffer buffer = pool.allocateFrame(frameSize); - if (buffer != null) { - accessor.clear(buffer); - frames.add(buffer); - return frames.size() - 1; - } - return -1; - } - - private static int calculateMinFrameSizeToPlaceTuple(int requiredFreeSpace, int minFrameSize) { - return (1 + (requiredFreeSpace + 4 - 1) / minFrameSize) * minFrameSize; - } - - private static int calculatePhysicalSpace(IFrameTupleAccessor fta, int idx) { - // 4 bytes to store the offset - return 4 + fta.getTupleLength(idx); - } - - private static int calculateMinFreeSpace(RecordDescriptor recordDescriptor) { - // + 4 for the tuple offset - return recordDescriptor.getFieldCount() * 4 + 4; - } - - @Override - public void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException { - accessor.reset(frames.get(tuplePointer.frameIndex)); - accessor.delete(tuplePointer.tupleIndex); - numTuples--; - } - - @Override - public void close() { - pool.close(); - policy.reset(); - frames.clear(); - numTuples = 0; - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("VariableTupleMemoryManager has reorganized " + statsReOrg + " times"); - } - statsReOrg = 0; - } - - @Override - public ITupleBufferAccessor getTupleAccessor() { - return new ITupleBufferAccessor() { - private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender( - recordDescriptor); - private int tid; - - @Override - public void reset(TuplePointer tuplePointer) { - bufferAccessor.reset(frames.get(tuplePointer.frameIndex)); - tid = tuplePointer.tupleIndex; - } - - @Override - public ByteBuffer getTupleBuffer() { - return bufferAccessor.getBuffer(); - } - - @Override - public int getTupleStartOffset() { - return bufferAccessor.getTupleStartOffset(tid); - } - - @Override - public int getTupleLength() { - return bufferAccessor.getTupleLength(tid); - } - - @Override - public int getAbsFieldStartOffset(int fieldId) { - return bufferAccessor.getAbsoluteFieldStartOffset(tid, fieldId); - } - - @Override - public int getFieldLength(int fieldId) { - return bufferAccessor.getFieldLength(tid, fieldId); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java index f77ec63..31ea07d 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java @@ -26,7 +26,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAppender; import org.apache.hyracks.api.exceptions.HyracksDataException; /** - * Basically it a union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}. + * Basically, it is an union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}. * Moreover, it has the delete function as well. * This is a special TupleAccessor used for TopK sorting. * In HeapSort, or other Tuple-based operators, we need to append the tuple, access the arbitrary previously http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java index c6ca09b..d9a62c8 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java @@ -72,9 +72,7 @@ public abstract class AbstractHeap implements IHeap { @Override public void reset() { - for (int i = 0; i < numEntry; i++) { - entries[i] = null; - } + Arrays.fill(entries, null); numEntry = 0; } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java index ceae0f1..8cd6792 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java @@ -22,15 +22,19 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface ISerializableTable { - public void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException; + void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException; - public void getTuplePointer(int entry, int offset, TuplePointer tuplePointer); + void delete(int entry); - public int getFrameCount(); + boolean getTuplePointer(int entry, int offset, TuplePointer tuplePointer); - public int getTupleCount(); + int getFrameCount(); - public void reset(); + int getTupleCount(); - public void close(); + int getTupleCount(int entry); + + void reset(); + + void close(); } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java index 7db57c0..1f2ebef 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java @@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.structures; import java.util.ArrayList; import java.util.List; +import org.apache.hyracks.api.context.IHyracksFrameMgrContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -36,14 +37,14 @@ public class SerializableHashTable implements ISerializableTable { private IntSerDeBuffer[] headers; private List contents = new ArrayList(); private List frameCurrentIndex = new ArrayList(); - private final IHyracksTaskContext ctx; - private int frameCapacity = 0; + private final IHyracksFrameMgrContext ctx; + private final int frameCapacity; private int currentLargestFrameIndex = 0; private int tupleCount = 0; private int headerFrameCount = 0; private TuplePointer tempTuplePointer = new TuplePointer(); - public SerializableHashTable(int tableSize, final IHyracksTaskContext ctx) throws HyracksDataException { + public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException { this.ctx = ctx; int frameSize = ctx.getInitialFrameSize(); @@ -81,28 +82,45 @@ public class SerializableHashTable implements ISerializableTable { } @Override - public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) { + public void delete(int entry) { + int hFrameIndex = getHeaderFrameIndex(entry); + int headerOffset = getHeaderFrameOffset(entry); + IntSerDeBuffer header = headers[hFrameIndex]; + if (header != null) { + int frameIndex = header.getInt(headerOffset); + int offsetIndex = header.getInt(headerOffset + 1); + if (frameIndex >= 0) { + IntSerDeBuffer frame = contents.get(frameIndex); + int entryUsedItems = frame.getInt(offsetIndex + 1); + frame.writeInt(offsetIndex + 1, 0); + tupleCount -= entryUsedItems; + } + } + } + + @Override + public boolean getTuplePointer(int entry, int offset, TuplePointer dataPointer) { int hFrameIndex = getHeaderFrameIndex(entry); int headerOffset = getHeaderFrameOffset(entry); IntSerDeBuffer header = headers[hFrameIndex]; if (header == null) { dataPointer.frameIndex = -1; dataPointer.tupleIndex = -1; - return; + return false; } int frameIndex = header.getInt(headerOffset); int offsetIndex = header.getInt(headerOffset + 1); if (frameIndex < 0) { dataPointer.frameIndex = -1; dataPointer.tupleIndex = -1; - return; + return false; } IntSerDeBuffer frame = contents.get(frameIndex); int entryUsedItems = frame.getInt(offsetIndex + 1); if (offset > entryUsedItems - 1) { dataPointer.frameIndex = -1; dataPointer.tupleIndex = -1; - return; + return false; } int startIndex = offsetIndex + 2 + offset * 2; while (startIndex >= frameCapacity) { @@ -112,6 +130,7 @@ public class SerializableHashTable implements ISerializableTable { frame = contents.get(frameIndex); dataPointer.frameIndex = frame.getInt(startIndex); dataPointer.tupleIndex = frame.getInt(startIndex + 1); + return true; } @Override @@ -139,9 +158,26 @@ public class SerializableHashTable implements ISerializableTable { } @Override + public int getTupleCount(int entry) { + int hFrameIndex = getHeaderFrameIndex(entry); + int headerOffset = getHeaderFrameOffset(entry); + IntSerDeBuffer header = headers[hFrameIndex]; + if (header != null) { + int frameIndex = header.getInt(headerOffset); + int offsetIndex = header.getInt(headerOffset + 1); + if (frameIndex >= 0) { + IntSerDeBuffer frame = contents.get(frameIndex); + int entryUsedItems = frame.getInt(offsetIndex + 1); + return entryUsedItems; + } + } + return 0; + } + + @Override public void close() { int nFrames = contents.size(); - for (int i = 0; i < headers.length; i++) + for (int i = 0; i < headers.length; i++) headers[i] = null; contents.clear(); frameCurrentIndex.clear(); @@ -259,31 +295,30 @@ public class SerializableHashTable implements ISerializableTable { return offset; } -} - -class IntSerDeBuffer { + class IntSerDeBuffer { - private byte[] bytes; + private byte[] bytes; - public IntSerDeBuffer(byte[] data) { - this.bytes = data; - } + public IntSerDeBuffer(byte[] data) { + this.bytes = data; + } - public int getInt(int pos) { - int offset = pos * 4; - return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8) - + ((bytes[offset + 3] & 0xff) << 0); - } + public int getInt(int pos) { + int offset = pos * 4; + return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0); + } - public void writeInt(int pos, int value) { - int offset = pos * 4; - bytes[offset++] = (byte) (value >> 24); - bytes[offset++] = (byte) (value >> 16); - bytes[offset++] = (byte) (value >> 8); - bytes[offset++] = (byte) (value); - } + public void writeInt(int pos, int value) { + int offset = pos * 4; + bytes[offset++] = (byte) (value >> 24); + bytes[offset++] = (byte) (value >> 16); + bytes[offset++] = (byte) (value >> 8); + bytes[offset++] = (byte) (value); + } - public int capacity() { - return bytes.length / 4; + public int capacity() { + return bytes.length / 4; + } } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java new file mode 100644 index 0000000..c74fe04 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.util; + +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor; + +public class FrameTuplePairComparator { + private final int[] keys0; + private final int[] keys1; + private final IBinaryComparator[] comparators; + + public FrameTuplePairComparator(int[] keys0, int[] keys1, IBinaryComparator[] comparators) { + this.keys0 = keys0; + this.keys1 = keys1; + this.comparators = comparators; + } + + public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) + throws HyracksDataException { + int tStart0 = accessor0.getTupleStartOffset(tIndex0); + int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0; + + int tStart1 = accessor1.getTupleStartOffset(tIndex1); + int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1; + + for (int i = 0; i < keys0.length; ++i) { + int fIdx0 = keys0[i]; + int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0); + int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0); + int fLen0 = fEnd0 - fStart0; + + int fIdx1 = keys1[i]; + int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1); + int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1); + int fLen1 = fEnd1 - fStart1; + + int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1 + .getBuffer().array(), fStart1 + fStartOffset1, fLen1); + if (c != 0) { + return c; + } + } + return 0; + } + + public int compare(IFrameTupleAccessor accessor0, int tIndex0, ITuplePointerAccessor bufferAccessor) + throws HyracksDataException { + int tStart0 = accessor0.getTupleStartOffset(tIndex0); + int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0; + + for (int i = 0; i < keys0.length; ++i) { + int fIdx0 = keys0[i]; + int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0); + int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0); + int fLen0 = fEnd0 - fStart0; + + int fStart1 = bufferAccessor.getAbsFieldStartOffset(keys1[i]); + int fLen1 = bufferAccessor.getFieldLength(keys1[i]); + + int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, bufferAccessor + .getBuffer().array(), fStart1, fLen1); + if (c != 0) { + return c; + } + } + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java new file mode 100644 index 0000000..3e4e578 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.NUM_MIN_FRAME; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.Test; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public abstract class AbstractFramePoolTest { + IFramePool pool; + + @Test + public void testGetMinFrameSize() throws Exception { + assertEquals(MIN_FRAME_SIZE, commonFrameManager.getInitialFrameSize()); + assertEquals(MIN_FRAME_SIZE, pool.getMinFrameSize()); + } + + @Test + public void testGetMemoryBudgetBytes() throws Exception { + assertEquals(BUDGET, pool.getMemoryBudgetBytes()); + } + + protected void testAllocateShouldFailAfterAllSpaceGetUsed() throws HyracksDataException { + for (int i = 0; i < NUM_MIN_FRAME; i++) { + assertNull(pool.allocateFrame(MIN_FRAME_SIZE)); + } + } + + protected HashSet testAllocateAllSpacesWithMinFrames() throws HyracksDataException { + HashSet set = new HashSet<>(); + for (int i = 0; i < NUM_MIN_FRAME; i++) { + testAllocateNewBuffer(set, MIN_FRAME_SIZE); + } + return set; + } + + protected void testAllocateNewBuffer(HashSet set, int frameSize) throws HyracksDataException { + ByteBuffer buffer = pool.allocateFrame(frameSize); + assertNotNull(buffer); + assertEquals(buffer.capacity(), frameSize); + assertTrue(!set.contains(new ByteBufferPtr(buffer))); + set.add(new ByteBufferPtr(buffer)); + } + + /** + * Pool will become 1,2,3,4,5 + * + * @throws HyracksDataException + */ + protected Set testAllocateVariableFrames() throws HyracksDataException { + int budget = BUDGET; + int allocate = 0; + int i = 1; + Set set = new HashSet<>(); + while (budget - allocate >= i * MIN_FRAME_SIZE) { + ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE); + assertNotNull(buffer); + set.add(new ByteBufferPtr(buffer)); + allocate += i++ * MIN_FRAME_SIZE; + } + return set; + } + + protected void testShouldFindTheMatchFrames(Set set) throws HyracksDataException { + pool.reset(); + List list = Arrays.asList(1, 2, 3, 4, 5); + + for (int i = 0; i < list.size(); i++) { + ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE); + assertNotNull(buffer); + assertTrue(set.contains(new ByteBufferPtr(buffer))); + assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity()); + } + pool.reset(); + for (int i = list.size() - 1; i >= 0; i--) { + ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE); + assertNotNull(buffer); + assertTrue(set.contains(new ByteBufferPtr(buffer))); + assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity()); + } + + Collections.shuffle(list); + pool.reset(); + for (int i = 0; i < list.size(); i++) { + ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE); + assertNotNull(buffer); + assertTrue(set.contains(new ByteBufferPtr(buffer))); + assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity()); + } + + } + + public static class ByteBufferPtr { + ByteBuffer bytebuffer; + + public ByteBufferPtr(ByteBuffer buffer) { + bytebuffer = buffer; + } + + @Override + public int hashCode() { + return bytebuffer.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return this.bytebuffer == ((ByteBufferPtr) obj).bytebuffer; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java new file mode 100644 index 0000000..cdf8834 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.hyracks.api.comm.FixedSizeFrame; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; +import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; +import org.apache.hyracks.dataflow.std.sort.Utility; +import org.apache.hyracks.dataflow.std.structures.TuplePointer; + +public abstract class AbstractTupleMemoryManagerTest { + ISerializerDeserializer[] fieldsSerDer = new ISerializerDeserializer[] { + IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; + RecordDescriptor recordDescriptor = new RecordDescriptor(fieldsSerDer); + ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptor.getFieldCount()); + FrameTupleAccessor inFTA = new FrameTupleAccessor(recordDescriptor); + Random random = new Random(System.currentTimeMillis()); + + abstract ITuplePointerAccessor getTupleAccessor(); + + protected void assertEachTupleInFTAIsInBuffer(Map map, Map mapInserted) { + ITuplePointerAccessor accessor = getTupleAccessor(); + for (Map.Entry entry : mapInserted.entrySet()) { + accessor.reset(entry.getKey()); + int dataLength = map.get(entry.getValue()); + assertEquals((int) entry.getValue(), + IntSerDeUtils.getInt(accessor.getBuffer().array(), accessor.getAbsFieldStartOffset(0))); + assertEquals(dataLength, accessor.getTupleLength()); + } + assertEquals(map.size(), mapInserted.size()); + } + + protected Map prepareFixedSizeTuples( + int tuplePerFrame, + int extraMetaBytePerFrame, + int extraMetaBytePerRecord) throws HyracksDataException { + Map dataSet = new HashMap<>(); + ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET); + FixedSizeFrame frame = new FixedSizeFrame(buffer); + FrameTupleAppender appender = new FrameTupleAppender(); + appender.reset(frame, true); + + int sizePerTuple = (Common.MIN_FRAME_SIZE - 1 - tuplePerFrame * 4 - 4 - extraMetaBytePerFrame) / tuplePerFrame; + int sizeChar = + sizePerTuple - extraMetaBytePerRecord - fieldsSerDer.length * 4 - 4 - 2; //2byte to write str length + assert (sizeChar > 0); + for (int i = 0; i < Common.NUM_MIN_FRAME * tuplePerFrame; i++) { + tupleBuilder.reset(); + tupleBuilder.addField(fieldsSerDer[0], i); + tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', sizeChar)); + if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, + tupleBuilder.getSize())) { + assert false; + } + dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4); + } + inFTA.reset(buffer); + return dataSet; + } + + protected Map prepareVariableSizeTuples() throws HyracksDataException { + Map dataSet = new HashMap<>(); + ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET); + FixedSizeFrame frame = new FixedSizeFrame(buffer); + FrameTupleAppender appender = new FrameTupleAppender(); + appender.reset(frame, true); + + for (int i = 0; true; i++) { + tupleBuilder.reset(); + tupleBuilder.addField(fieldsSerDer[0], i); + tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', i)); + if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, + tupleBuilder.getSize())) { + break; + } + dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4); + } + inFTA.reset(buffer); + return dataSet; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java new file mode 100644 index 0000000..7389aab --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import org.apache.hyracks.control.nc.resources.memory.FrameManager; + +public class Common { + static int MIN_FRAME_SIZE = 256; + static int NUM_MIN_FRAME = 15; + static int BUDGET = NUM_MIN_FRAME * MIN_FRAME_SIZE; + + static FrameManager commonFrameManager = new FrameManager(MIN_FRAME_SIZE); +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java new file mode 100644 index 0000000..2a84e0e --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.HashSet; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class DeletableFramePoolTest extends AbstractFramePoolTest { + + @Before + public void setUp() { + pool = new DeallocatableFramePool(commonFrameManager, BUDGET); + } + + DeallocatableFramePool getPool() { + return (DeallocatableFramePool) pool; + } + + @Test + public void testAllocateBuffers() throws HyracksDataException { + testAllocateAllSpacesWithMinFrames(); + } + + @Test + public void testCanNotAllocateMore() throws HyracksDataException { + testAllocateAllSpacesWithMinFrames(); + assertNull(pool.allocateFrame(MIN_FRAME_SIZE)); + } + + @Test + public void testReusePreAllocatedBuffer() throws HyracksDataException { + HashSet set = testAllocateAllSpacesWithMinFrames(); + for (ByteBufferPtr ptr : set) { + getPool().deAllocateBuffer(ptr.bytebuffer); + } + HashSet set2 = testAllocateAllSpacesWithMinFrames(); + assertEquals(set, set2); + } + + @Test + public void testMergeCase() throws HyracksDataException { + HashSet set = testAllocateAllSpacesWithMinFrames(); + for (ByteBufferPtr ptr : set) { + getPool().deAllocateBuffer(ptr.bytebuffer); + } + set.clear(); + int i = 1; + for (int sum = 0; sum + MIN_FRAME_SIZE * i <= BUDGET; i++) { + sum += MIN_FRAME_SIZE * i; + testAllocateNewBuffer(set, MIN_FRAME_SIZE * i); + } + assertNull(pool.allocateFrame(MIN_FRAME_SIZE * i)); + for (ByteBufferPtr ptr : set) { + getPool().deAllocateBuffer(ptr.bytebuffer); + } + set.clear(); + testAllocateNewBuffer(set, BUDGET); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java new file mode 100644 index 0000000..992c7f6 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static org.junit.Assert.assertEquals; + +import org.junit.Before; +import org.junit.Test; + +public class FrameFreeSlotBestFitUsingTreeMapTest { + + static int size = 10; + + FrameFreeSlotSmallestFit policy; + + @Before + public void intial() { + policy = new FrameFreeSlotSmallestFit(); + } + + @Test + public void testAll() { + + for (int i = 0; i < size; i++) { + policy.pushNewFrame(i, i); + assertEquals(i, policy.popBestFit(i)); + } + assertEquals(-1, policy.popBestFit(0)); + + for (int i = 0; i < size; i++) { + policy.pushNewFrame(i, i); + } + for (int i = 0; i < size; i++) { + assertEquals(i, policy.popBestFit(i)); + } + + } + + @Test + public void testReset(){ + testAll(); + policy.reset(); + testAll(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java new file mode 100644 index 0000000..88a54bd --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static junit.framework.Assert.assertEquals; + +import org.junit.Before; +import org.junit.Test; + +public class FrameFreeSlotBiggestFirstTest { + + static int size = 10; + + FrameFreeSlotBiggestFirst policy; + + @Before + public void intial() { + policy = new FrameFreeSlotBiggestFirst(size); + } + + @Test + public void testAll() { + + for (int i = 0; i < size; i++) { + policy.pushNewFrame(i, i); + assertEquals(i, policy.popBestFit(i)); + } + assertEquals(-1, policy.popBestFit(0)); + + for (int i = 0; i < size; i++) { + policy.pushNewFrame(i, i); + } + for (int i = 0; i < size; i++) { + assertEquals(size - i - 1, policy.popBestFit(0)); + } + + for (int i = 0; i < size; i++) { + policy.pushNewFrame(i, i); + } + for (int i = 0; i < size / 2; i++) { + assertEquals(size - i - 1, policy.popBestFit(size / 2)); + } + assertEquals(-1, policy.popBestFit(size / 2)); + for (int i = 0; i < size / 2; i++) { + assertEquals(size / 2 - i - 1, policy.popBestFit(0)); + } + + } + + @Test + public void testReset() { + testAll(); + policy.reset(); + testAll(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java new file mode 100644 index 0000000..87b1b91 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Before; +import org.junit.Test; + +public class FrameFreeSlotLastFitTest { + + FrameFreeSlotLastFit zeroPolicy; + FrameFreeSlotLastFit unifiedPolicy; + FrameFreeSlotLastFit ascPolicy; + FrameFreeSlotLastFit dscPolicy; + + static final int size = 10; + static final int medium = 5; + + @Before + public void setUp() throws Exception { + zeroPolicy = new FrameFreeSlotLastFit(0); + unifiedPolicy = new FrameFreeSlotLastFit(size); + ascPolicy = new FrameFreeSlotLastFit(size); + dscPolicy = new FrameFreeSlotLastFit(size); + } + + @Test + public void testPushAndPop() throws Exception { + for (int i = 0; i < size; i++) { + unifiedPolicy.pushNewFrame(i, medium); + } + for (int i = 0; i < size; i++) { + assertTrue(unifiedPolicy.popBestFit(medium) == size - i - 1); + } + assertTrue(unifiedPolicy.popBestFit(0) == -1); + + for (int i = 0; i < size / 2; i++) { + ascPolicy.pushNewFrame(i, i); + assertEquals(ascPolicy.popBestFit(medium), -1); + dscPolicy.pushNewFrame(i, size - i - 1); + assertEquals(dscPolicy.popBestFit(medium), i); + } + + for (int i = size / 2; i < size; i++) { + ascPolicy.pushNewFrame(i, i); + assertEquals(ascPolicy.popBestFit(medium), i); + dscPolicy.pushNewFrame(i, size - i - 1); + assertEquals(dscPolicy.popBestFit(medium), -1); + } + + ascPolicy.reset(); + for (int i = 0; i < size; i++) { + ascPolicy.pushNewFrame(size - i, size - i); + } + + for (int i = 0; i < size; i++) { + assertEquals(size - i, ascPolicy.popBestFit(size - i)); + } + } + + @Test + public void testReset() throws Exception { + testPushAndPop(); + + zeroPolicy.reset(); + unifiedPolicy.reset(); + ascPolicy.reset(); + dscPolicy.reset(); + testPushAndPop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java new file mode 100644 index 0000000..ce31108 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; +import org.apache.hyracks.dataflow.std.structures.TuplePointer; +import org.junit.Before; +import org.junit.Test; + +public class VPartitionTupleBufferManagerTest extends AbstractTupleMemoryManagerTest { + + VPartitionTupleBufferManager bufferManager; + final int PARTITION = 4; + + @Before + public void setup() throws HyracksDataException { + IPartitionedMemoryConstrain constrain = new IPartitionedMemoryConstrain() { + @Override + public int frameLimit(int partitionId) { + return Integer.MAX_VALUE; + } + }; + bufferManager = new VPartitionTupleBufferManager(Common.commonFrameManager, constrain, PARTITION, + Common.BUDGET); + } + + @Test + public void testGetNumPartitions() throws Exception { + assertEquals(PARTITION, bufferManager.getNumPartitions()); + } + + @Test + public void testGetNumTuples() throws Exception { + testNumTuplesAndSizeIsZero(); + } + + @Test + public void testInsertToFull() throws HyracksDataException { + Map inMap = prepareFixedSizeTuples(10, 0, 0); + for (int pid = 0; pid < PARTITION; pid++) { + assertInsertOnePartitionToFull(pid, inMap); + bufferManager.reset(); + } + } + + @Test + public void testInsertClearSequence() throws HyracksDataException { + Map inMap = prepareFixedSizeTuples(10, 0, 0); + for (int pid = 0; pid < PARTITION; pid++) { + assertInsertOnePartitionToFull(pid, inMap); + bufferManager.reset(); + } + } + + private void assertInsertOnePartitionToFull(int pid, Map inMap) throws HyracksDataException { + testNumTuplesAndSizeIsZero(); + + Map outMap = testInsertOnePartitionToFull(pid); + assertEquals(outMap.size(), bufferManager.getNumTuples(pid)); + assertEquals(Common.BUDGET, bufferManager.getPhysicalSize(pid)); + testCanNotInsertToAnyPartitions(); + assertEachTupleInFTAIsInBuffer(inMap, outMap); + + } + + private void testCanNotInsertToAnyPartitions() throws HyracksDataException { + for (int i = 0; i < PARTITION; i++) { + assertFalse(bufferManager.insertTuple(i, tupleBuilder.getByteArray(), tupleBuilder.getFieldEndOffsets(), 0, + tupleBuilder.getSize(), null)); + } + } + + private Map testInsertOnePartitionToFull(int idpart) throws HyracksDataException { + Map tuplePointerIntegerMap = new HashMap<>(); + + for (int i = 0; i < inFTA.getTupleCount(); i++) { + TuplePointer tuplePointer = new TuplePointer(); + copyDataToTupleBuilder(inFTA, i, tupleBuilder); + if (!bufferManager.insertTuple(idpart, tupleBuilder.getByteArray(), tupleBuilder.getFieldEndOffsets(), 0, + tupleBuilder.getSize(), tuplePointer)) { + assert false; + } + tuplePointerIntegerMap.put(tuplePointer, + IntSerDeUtils.getInt(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(i, 0))); + } + return tuplePointerIntegerMap; + + } + + private static void copyDataToTupleBuilder(FrameTupleAccessor inFTA, int tid, ArrayTupleBuilder tupleBuilder) + throws HyracksDataException { + tupleBuilder.reset(); + for (int fid = 0; fid < inFTA.getFieldCount(); fid++) { + tupleBuilder.addField(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(tid, fid), + inFTA.getFieldLength(tid, fid)); + } + } + + private void testNumTuplesAndSizeIsZero() { + for (int i = 0; i < bufferManager.getNumPartitions(); ++i) { + assertEquals(0, bufferManager.getNumTuples(i)); + assertEquals(0, bufferManager.getPhysicalSize(0)); + } + } + + @Test + public void testClearPartition() throws Exception { + + Map inMap = prepareFixedSizeTuples(10, 0, 0); + for (int pid = 0; pid < PARTITION; pid++) { + assertInsertOnePartitionToFull(pid, inMap); + assertClearFullPartitionIsTheSameAsReset(); + } + } + + private void assertClearFullPartitionIsTheSameAsReset() throws HyracksDataException { + for (int i = 0; i < PARTITION; i++) { + bufferManager.clearPartition(i); + } + testNumTuplesAndSizeIsZero(); + } + + @Test + public void testClose() throws Exception { + testInsertToFull(); + bufferManager.close(); + } + + @Override + ITuplePointerAccessor getTupleAccessor() { + return bufferManager.getTupleAccessor(recordDescriptor); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java new file mode 100644 index 0000000..8dbe1f0 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.buffermanager; + +import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.NUM_MIN_FRAME; +import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class VariableFramePoolTest extends AbstractFramePoolTest { + + @Before + public void setUp() throws Exception { + pool = new VariableFramePool(commonFrameManager, BUDGET); + } + + @Test + public void testAllocateUniformFrameShouldSuccess() throws Exception { + testAllocateAllSpacesWithMinFrames(); + testAllocateShouldFailAfterAllSpaceGetUsed(); + pool.reset(); + testAllocateAllSpacesWithMinFrames(); + pool.close(); + } + + @Test + public void testResetShouldReuseExistingFrames() throws HyracksDataException { + Set set1 = testAllocateAllSpacesWithMinFrames(); + pool.reset(); + Set set2 = testAllocateAllSpacesWithMinFrames(); + assertEquals(set1, set2); + pool.close(); + } + + @Test + public void testCloseShouldNotReuseExistingFrames() throws HyracksDataException { + Set set1 = testAllocateAllSpacesWithMinFrames(); + pool.close(); + Set set2 = testAllocateAllSpacesWithMinFrames(); + assertFalse(set1.equals(set2)); + pool.close(); + } + + @Test + public void testShouldReturnLargerFramesIfFitOneIsUsed() throws HyracksDataException { + Set set = testAllocateVariableFrames(); + pool.reset(); + testShouldFindTheMatchFrames(set); + pool.reset(); + + // allocate seq: 1, 1, 2, 3, 4 + ByteBuffer placeBuffer = pool.allocateFrame(MIN_FRAME_SIZE); + assertTrue(set.contains(new ByteBufferPtr(placeBuffer))); + for (int i = 1; i <= 4; i++) { + ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE); + assertNotNull(buffer); + assertTrue(set.contains(new ByteBufferPtr(buffer))); + } + assertNull(pool.allocateFrame(MIN_FRAME_SIZE)); + pool.close(); + } + + @Test + public void testShouldMergeIfNoLargerFrames() throws HyracksDataException { + Set set = testAllocateAllSpacesWithMinFrames(); + pool.reset(); + int chunks = 5; + for (int i = 0; i < NUM_MIN_FRAME; i += chunks) { + ByteBuffer buffer = pool.allocateFrame(chunks * MIN_FRAME_SIZE); + assertNotNull(buffer); + assertTrue(!set.contains(new ByteBufferPtr(buffer))); + } + } + + @Test + public void testUseMiddleSizeFrameAndNeedToMergeSmallAndBigger() throws HyracksDataException { + Set set = testAllocateVariableFrames(); + pool.reset(); + // allocate seq: 3, 6, 1; + ByteBuffer buffer = pool.allocateFrame(3 * MIN_FRAME_SIZE); + assertTrue(set.contains(new ByteBufferPtr(buffer))); + buffer = pool.allocateFrame(6 * MIN_FRAME_SIZE); + assertFalse(set.contains(new ByteBufferPtr(buffer))); + buffer = pool.allocateFrame(1 * MIN_FRAME_SIZE); + assertTrue(set.contains(new ByteBufferPtr(buffer))); + assertEquals(5 * MIN_FRAME_SIZE, buffer.capacity()); + pool.reset(); + } +} \ No newline at end of file