asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wangs...@apache.org
Subject [2/3] asterixdb git commit: ASTERIXDB-1556, ASTERIXDB-1733: Hash Group By and Hash Join conform to the memory budget
Date Wed, 04 Jan 2017 23:46:30 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index ed7ae8e..8e52838 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -31,18 +32,20 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.TupleInFrameListAccessor;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 public class InMemoryHashJoin {
 
-    private final IHyracksTaskContext ctx;
     private final List<ByteBuffer> buffers;
     private final FrameTupleAccessor accessorBuild;
     private final ITuplePartitionComputer tpcBuild;
@@ -57,23 +60,29 @@ public class InMemoryHashJoin {
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
     private final IPredicateEvaluator predEvaluator;
+    private TupleInFrameListAccessor tupleAccessor;
+    // To release frames
+    ISimpleFrameBufferManager bufferManager;
     private final boolean isTableCapacityNotZero;
 
     private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName());
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe,
-            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild,
-            FrameTuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
-            ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
-        this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, tpcBuild, comparator, isLeftOuter,
-                missingWritersBuild, table, predEval, false);
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+            ITuplePartitionComputer tpcBuild, FrameTuplePairComparator comparator, boolean isLeftOuter,
+            IMissingWriter[] missingWritersBuild, ISerializableTable table, IPredicateEvaluator predEval,
+            ISimpleFrameBufferManager bufferManager)
+            throws HyracksDataException {
+        this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, comparator, isLeftOuter,
+                missingWritersBuild, table, predEval, false, bufferManager);
     }
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe,
-            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild,
-            FrameTuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
-            ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
-        this.ctx = ctx;
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild,
+            RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild, FrameTuplePairComparator comparator,
+            boolean isLeftOuter, IMissingWriter[] missingWritersBuild, ISerializableTable table,
+            IPredicateEvaluator predEval, boolean reverse, ISimpleFrameBufferManager bufferManager)
+            throws HyracksDataException {
         this.tableSize = tableSize;
         this.table = table;
         storedTuplePointer = new TuplePointer();
@@ -98,6 +107,8 @@ public class InMemoryHashJoin {
             missingTupleBuild = null;
         }
         reverseOutputOrder = reverse;
+        this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
+        this.bufferManager = bufferManager;
         if (tableSize != 0) {
             isTableCapacityNotZero = true;
         } else {
@@ -115,8 +126,31 @@ public class InMemoryHashJoin {
         for (int i = 0; i < tCount; ++i) {
             int entry = tpcBuild.partition(accessorBuild, i, tableSize);
             storedTuplePointer.reset(bIndex, i);
-            table.insert(entry, storedTuplePointer);
+            // If an insertion fails, then tries to insert the same tuple pointer again after compacting the table.
+            if (!table.insert(entry, storedTuplePointer)) {
+                compactTableAndInsertAgain(entry, storedTuplePointer);
+            }
+        }
+    }
+
+    public boolean compactTableAndInsertAgain(int entry, TuplePointer tPointer) throws HyracksDataException {
+        boolean oneMoreTry = false;
+        if (compactHashTable() >= 0) {
+            oneMoreTry = table.insert(entry, tPointer);
         }
+        return oneMoreTry;
+    }
+
+    /**
+     * Tries to compact the table to make some space.
+     *
+     * @return the number of frames that have been reclaimed. If no compaction has happened, the value -1 is returned.
+     */
+    public int compactHashTable() throws HyracksDataException {
+        if (table.isGarbageCollectionNeeded()) {
+            return table.collectGarbage(tupleAccessor, tpcBuild);
+        }
+        return -1;
     }
 
     /**
@@ -165,14 +199,18 @@ public class InMemoryHashJoin {
     public void closeJoin(IFrameWriter writer) throws HyracksDataException {
         appender.write(writer, true);
         int nFrames = buffers.size();
-        int totalSize = 0;
-        for (int i = 0; i < nFrames; i++) {
-            totalSize += buffers.get(i).capacity();
+        // Frames assigned to the data table will be released here.
+        if (bufferManager != null) {
+            for (int i = 0; i < nFrames; i++) {
+                bufferManager.releaseFrame(buffers.get(i));
+            }
         }
         buffers.clear();
-        ctx.deallocateFrames(totalSize);
-        LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
-                + Thread.currentThread().getId() + ".");
+
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
+                    + Thread.currentThread().getId() + ".");
+        }
     }
 
     public void closeTable() throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 0d6d163..a8d3f7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -49,6 +49,10 @@ import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -63,10 +67,13 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
     private final boolean isLeftOuter;
     private final IMissingWriterFactory[] nonMatchWriterFactories;
     private final int tableSize;
+    // The maximum number of in-memory frames that this hash join can use.
+    private final int memSizeInFrames;
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory) {
+            RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory,
+            int memSizeInFrames) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
@@ -77,12 +84,13 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
         this.isLeftOuter = false;
         this.nonMatchWriterFactories = null;
         this.tableSize = tableSize;
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
-            IMissingWriterFactory[] missingWriterFactories1, int tableSize) {
+            IMissingWriterFactory[] missingWriterFactories1, int tableSize, int memSizeInFrames) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
@@ -93,20 +101,22 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
         this.isLeftOuter = isLeftOuter;
         this.nonMatchWriterFactories = missingWriterFactories1;
         this.tableSize = tableSize;
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int tableSize) {
-        this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null);
+            RecordDescriptor recordDescriptor, int tableSize, int memSizeInFrames) {
+        this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null,
+                memSizeInFrames);
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1,
-            int tableSize) {
+            int tableSize, int memSizeInFrames) {
         this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, null, recordDescriptor, isLeftOuter,
-                nullWriterFactories1, tableSize);
+                nullWriterFactories1, tableSize, memSizeInFrames);
     }
 
     @Override
@@ -177,6 +187,10 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
             final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
                     : predEvaluatorFactory.createPredicateEvaluator());
 
+            final int memSizeInBytes = memSizeInFrames * ctx.getInitialFrameSize();
+            final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, memSizeInBytes);
+            final ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildTaskState state;
 
@@ -188,19 +202,38 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
                             .createPartitioner();
                     state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
                             new TaskId(getActivityId(), partition));
-                    ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                    ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
-                            isLeftOuter, nullWriters1, table, predEvaluator);
+                            new FrameTupleAccessor(rd1), rd1, hpc1,
+                            new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table,
+                            predEvaluator, bufferManager);
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+                    ByteBuffer copyBuffer = allocateBuffer(buffer.capacity());
                     FrameUtils.copyAndFlip(buffer, copyBuffer);
                     state.joiner.build(copyBuffer);
                 }
 
+                private ByteBuffer allocateBuffer(int frameSize) throws HyracksDataException {
+                    ByteBuffer newBuffer = bufferManager.acquireFrame(frameSize);
+                    if (newBuffer != null) {
+                        return newBuffer;
+                    }
+                    // At this moment, there is no enough memory since the newBuffer is null.
+                    // But, there may be a chance if we can compact the table, one or more frame may be reclaimed.
+                    if (state.joiner.compactHashTable() > 0) {
+                        newBuffer = bufferManager.acquireFrame(frameSize);
+                        if (newBuffer != null) {
+                            return newBuffer;
+                        }
+                    }
+                    // At this point, we have no way to get a frame.
+                    throw new HyracksDataException(
+                            "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                }
+
                 @Override
                 public void close() throws HyracksDataException {
                     ctx.setStateObject(state);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 4f85e57..17f009e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.dataflow.std.join;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -38,7 +37,11 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
 import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
@@ -85,7 +88,7 @@ public class OptimizedHybridHashJoin {
 
     private final BitSet spilledStatus; //0=resident, 1=spilled
     private final int numOfPartitions;
-    private final int memForJoin;
+    private final int memSizeInFrames;
     private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
 
     private IPartitionedTupleBufferManager bufferManager;
@@ -94,6 +97,9 @@ public class OptimizedHybridHashJoin {
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAccessor accessorProbe;
 
+    private IDeallocatableFramePool framePool;
+    private ISimpleFrameBufferManager bufferManagerForHashTable;
+
     private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
 
     // stats information
@@ -103,13 +109,14 @@ public class OptimizedHybridHashJoin {
                                                        // we mainly use it to match the corresponding function signature.
     private int[] probePSizeInTups;
 
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String probeRelName,
+    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
+            String probeRelName,
             String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators,
             RecordDescriptor probeRd, RecordDescriptor buildRd, ITuplePartitionComputer probeHpc,
             ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, boolean isLeftOuter,
             IMissingWriterFactory[] nullWriterFactories1) {
         this.ctx = ctx;
-        this.memForJoin = memForJoin;
+        this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
         this.buildHpc = buildHpc;
@@ -142,11 +149,12 @@ public class OptimizedHybridHashJoin {
     }
 
     public void initBuild() throws HyracksDataException {
-        bufferManager = new VPartitionTupleBufferManager(ctx,
+        framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+        bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
+        bufferManager = new VPartitionTupleBufferManager(
                 PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
-                numOfPartitions, memForJoin * ctx.getInitialFrameSize());
-        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus,
-                ctx.getInitialFrameSize());
+                numOfPartitions, framePool);
+        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus);
         spilledStatus.clear();
         buildPSizeInTups = new int[numOfPartitions];
     }
@@ -173,7 +181,7 @@ public class OptimizedHybridHashJoin {
         int victimPartition = spillPolicy.selectVictimPartition(pid);
         if (victimPartition < 0) {
             throw new HyracksDataException(
-                    "No more space left in the memory buffer, please give join more memory budgets.");
+                    "No more space left in the memory buffer, please assign more memory to hash-join.");
         }
         spillPartition(victimPartition);
     }
@@ -185,6 +193,13 @@ public class OptimizedHybridHashJoin {
         spilledStatus.set(pid);
     }
 
+    private void closeBuildPartition(int pid) throws HyracksDataException {
+        if (buildRFWriters[pid] == null) {
+            throw new HyracksDataException("Tried to close the non-existing file writer.");
+        }
+        buildRFWriters[pid].close();
+    }
+
     private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
         RunFileWriter[] runFileWriters = null;
         String refName = null;
@@ -209,20 +224,17 @@ public class OptimizedHybridHashJoin {
     }
 
     public void closeBuild() throws HyracksDataException {
-
+        // Flushes the remaining chunks of the all spilled partitions to the disk.
         closeAllSpilledPartitions(SIDE.BUILD);
 
-        bringBackSpilledPartitionIfHasMoreMemory(); //Trying to bring back as many spilled partitions as possible, making them resident
-
-        int inMemTupCount = 0;
-
-        for (int i = spilledStatus.nextClearBit(0); i >= 0
-                && i < numOfPartitions; i = spilledStatus.nextClearBit(i + 1)) {
-            inMemTupCount += buildPSizeInTups[i];
-        }
+        // Makes the space for the in-memory hash table (some partitions may need to be spilled to the disk
+        // during this step in order to make the space.)
+        // and tries to bring back as many spilled partitions as possible if there is free space.
+        int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
 
         createInMemoryJoiner(inMemTupCount);
-        cacheInMemJoin();
+
+        loadDataInMemJoin();
     }
 
     /**
@@ -261,24 +273,150 @@ public class OptimizedHybridHashJoin {
         }
     }
 
-    private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
+    /**
+     * Makes the space for the hash table. If there is no enough space, one or more partitions will be spilled
+     * to the disk until the hash table can fit into the memory. After this, bring back spilled partitions
+     * if there is available memory.
+     *
+     * @return the number of tuples in memory after this method is executed.
+     * @throws HyracksDataException
+     */
+    private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
         // we need number of |spilledPartitions| buffers to store the probe data
-        int freeSpace = (memForJoin - spilledStatus.cardinality()) * ctx.getInitialFrameSize();
+        int frameSize = ctx.getInitialFrameSize();
+        long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
+
+        // For partitions in main memory, we deduct their size from the free space.
+        int inMemTupCount = 0;
         for (int p = spilledStatus.nextClearBit(0); p >= 0
                 && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
             freeSpace -= bufferManager.getPhysicalSize(p);
+            inMemTupCount += buildPSizeInTups[p];
         }
 
+        // Calculates the expected hash table size for the given number of tuples in main memory
+        // and deducts it from the free space.
+        long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
+                frameSize);
+        freeSpace -= hashTableByteSizeForInMemTuples;
+
+        // In the case where free space is less than zero after considering the hash table size,
+        // we need to spill more partitions until we can accommodate the hash table in memory.
+        // TODO: there may be different policies (keep spilling minimum, spilling maximum, find a similar size to the
+        //                                        hash table, or keep spilling from the first partition)
+        boolean moreSpilled = false;
+
+        // No space to accommodate the hash table? Then, we spill one or more partitions to the disk.
+        if (freeSpace < 0) {
+            // Tries to find a best-fit partition not to spill many partitions.
+            int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
+            if (pidToSpill >= 0) {
+                // There is a suitable one. We spill that partition to the disk.
+                long hashTableSizeDecrease = -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+                        inMemTupCount, -buildPSizeInTups[pidToSpill], frameSize);
+                freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
+                inMemTupCount -= buildPSizeInTups[pidToSpill];
+                spillPartition(pidToSpill);
+                closeBuildPartition(pidToSpill);
+                moreSpilled = true;
+            } else {
+                // There is no single suitable partition. So, we need to spill multiple partitions to the disk
+                // in order to accommodate the hash table.
+                for (int p = spilledStatus.nextClearBit(0); p >= 0
+                        && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+                    int spaceToBeReturned = bufferManager.getPhysicalSize(p);
+                    int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
+                    if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
+                        continue;
+                    }
+                    spillPartition(p);
+                    closeBuildPartition(p);
+                    moreSpilled = true;
+                    // Since the number of tuples in memory has been decreased,
+                    // the hash table size will be decreased, too.
+                    // We put minus since the method returns a negative value to represent a newly reclaimed space.
+                    long expectedHashTableSizeDecrease = -SerializableHashTable
+                            .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, -numberOfTuplesToBeSpilled,
+                                    frameSize);
+                    freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
+                    // Adjusts the hash table size
+                    inMemTupCount -= numberOfTuplesToBeSpilled;
+                    if (freeSpace >= 0) {
+                        break;
+                    }
+                }
+            }
+        }
+
+        // If more partitions have been spilled to the disk, calculate the expected hash table size again
+        // before bringing some partitions to main memory.
+        if (moreSpilled) {
+            hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
+                    frameSize);
+        }
+
+        // Brings back some partitions if there is enough free space.
         int pid = 0;
-        while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
-            if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
-                return;
+        while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount)) >= 0) {
+            if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+                break;
             }
-            freeSpace -= bufferManager.getPhysicalSize(pid);
+            long expectedHashTableByteSizeIncrease = SerializableHashTable
+                    .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize);
+            freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - expectedHashTableByteSizeIncrease;
+            inMemTupCount += buildPSizeInTups[pid];
+            // Adjusts the hash table size
+            hashTableByteSizeForInMemTuples += expectedHashTableByteSizeIncrease;
         }
+
+        return inMemTupCount;
     }
 
-    private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
+    /**
+     * Finds a best-fit partition that will be spilled to the disk to make enough space to accommodate the hash table.
+     *
+     * @return the partition id that will be spilled to the disk. Returns -1 if there is no single suitable partition.
+     */
+    private int selectSinglePartitionToSpill(long currentFreeSpace, int currentInMemTupCount, int frameSize) {
+        long spaceAfterSpill;
+        long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
+        int minSpaceAfterSpillPartID = -1;
+
+        for (int p = spilledStatus.nextClearBit(0); p >= 0
+                && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+            if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
+                continue;
+            }
+            // We put minus since the method returns a negative value to represent a newly reclaimed space.
+            spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) + (-SerializableHashTable
+                    .calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, -buildPSizeInTups[p], frameSize));
+            if (spaceAfterSpill == 0) {
+                // Found the perfect one. Just returns this partition.
+                return p;
+            } else if (spaceAfterSpill > 0 && spaceAfterSpill < minSpaceAfterSpill) {
+                // We want to find the best-fit partition to avoid many partition spills.
+                minSpaceAfterSpill = spaceAfterSpill;
+                minSpaceAfterSpillPartID = p;
+            }
+        }
+        return minSpaceAfterSpillPartID;
+    }
+
+    private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
+        for (int i = spilledStatus.nextSetBit(pid); i >= 0
+                && i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
+            int spilledTupleCount = buildPSizeInTups[i];
+            // Expected hash table size increase after reloading this partition
+            long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+                    inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
+            if (freeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException {
         RunFileReader r = wr.createReader();
         r.open();
         if (reloadBuffer == null) {
@@ -288,7 +426,8 @@ public class OptimizedHybridHashJoin {
             accessorBuild.reset(reloadBuffer.getBuffer());
             for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
                 if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-                    // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
+                    // for some reason (e.g. due to fragmentation) if the inserting failed,
+                    // we need to clear the occupied frames
                     bufferManager.clearPartition(pid);
                     r.close();
                     return false;
@@ -296,33 +435,23 @@ public class OptimizedHybridHashJoin {
             }
         }
 
-        FileUtils.deleteQuietly(wr.getFileReference().getFile()); // delete the runfile if it already loaded into memory.
+        // Closes and deletes the run file if it is already loaded into memory.
+        r.setDeleteAfterClose(true);
         r.close();
         spilledStatus.set(pid, false);
         buildRFWriters[pid] = null;
         return true;
     }
 
-    private int selectPartitionsToReload(int freeSpace, int pid) {
-        for (int i = spilledStatus.nextSetBit(pid); i >= 0
-                && i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
-            assert buildRFWriters[i].getFileSize() > 0 : "How comes a spilled partition have size 0?";
-            if (freeSpace >= buildRFWriters[i].getFileSize()) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
     private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
-        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
+        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
         this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildHpc,
+                new FrameTupleAccessor(buildRd), buildRd, buildHpc,
                 new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nonMatchWriters, table,
-                predEvaluator, isReversed);
+                predEvaluator, isReversed, bufferManagerForHashTable);
     }
 
-    private void cacheInMemJoin() throws HyracksDataException {
+    private void loadDataInMemJoin() throws HyracksDataException {
 
         for (int pid = 0; pid < numOfPartitions; pid++) {
             if (!spilledStatus.get(pid)) {
@@ -391,7 +520,6 @@ public class OptimizedHybridHashJoin {
                 probePSizeInTups[pid]++;
             }
         }
-
     }
 
     private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
@@ -419,6 +547,7 @@ public class OptimizedHybridHashJoin {
         bufferManager.close();
         inMemJoiner = null;
         bufferManager = null;
+        bufferManagerForHashTable = null;
     }
 
     /**
@@ -475,4 +604,71 @@ public class OptimizedHybridHashJoin {
     public void setIsReversed(boolean b) {
         this.isReversed = b;
     }
+
+    /**
+     * Prints out the detailed information for partitions: in-memory and spilled partitions.
+     * This method exists for a debug purpose.
+     */
+    public String printPartitionInfo(SIDE whichSide) {
+        StringBuilder buf = new StringBuilder();
+        buf.append(">>> " + this + " " + Thread.currentThread().getId() + " printInfo():" + "\n");
+        if (whichSide == SIDE.BUILD) {
+            buf.append("BUILD side" + "\n");
+        } else {
+            buf.append("PROBE side" + "\n");
+        }
+        buf.append("# of partitions:\t" + numOfPartitions + "\t#spilled:\t" + spilledStatus.cardinality()
+                + "\t#in-memory:\t" + (numOfPartitions - spilledStatus.cardinality()) + "\n");
+        buf.append("(A) Spilled partitions" + "\n");
+        int spilledTupleCount = 0;
+        int spilledPartByteSize = 0;
+        for (int pid = spilledStatus.nextSetBit(0); pid >= 0
+                && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+            if (whichSide == SIDE.BUILD) {
+                spilledTupleCount += buildPSizeInTups[pid];
+                spilledPartByteSize += buildRFWriters[pid].getFileSize();
+                buf.append("part:\t" + pid + "\t#tuple:\t" + buildPSizeInTups[pid] + "\tsize(MB):\t"
+                        + ((double) buildRFWriters[pid].getFileSize() / 1048576) + "\n");
+            } else {
+                spilledTupleCount += probePSizeInTups[pid];
+                spilledPartByteSize += probeRFWriters[pid].getFileSize();
+            }
+        }
+        if (spilledStatus.cardinality() > 0) {
+            buf.append("# of spilled tuples:\t" + spilledTupleCount + "\tsize(MB):\t"
+                    + ((double) spilledPartByteSize / 1048576) + "avg #tuples per spilled part:\t"
+                    + (spilledTupleCount / spilledStatus.cardinality()) + "\tavg size per part(MB):\t"
+                    + ((double) spilledPartByteSize / 1048576 / spilledStatus.cardinality()) + "\n");
+        }
+        buf.append("(B) In-memory partitions" + "\n");
+        int inMemoryTupleCount = 0;
+        int inMemoryPartByteSize = 0;
+        for (int pid = spilledStatus.nextClearBit(0); pid >= 0
+                && pid < numOfPartitions; pid = spilledStatus.nextClearBit(pid + 1)) {
+            if (whichSide == SIDE.BUILD) {
+                inMemoryTupleCount += buildPSizeInTups[pid];
+                inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
+            } else {
+                inMemoryTupleCount += probePSizeInTups[pid];
+                inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
+            }
+        }
+        if (spilledStatus.cardinality() > 0) {
+            buf.append("# of in-memory tuples:\t" + inMemoryTupleCount + "\tsize(MB):\t"
+                    + ((double) inMemoryPartByteSize / 1048576) + "avg #tuples per spilled part:\t"
+                    + (inMemoryTupleCount / spilledStatus.cardinality()) + "\tavg size per part(MB):\t"
+                    + ((double) inMemoryPartByteSize / 1048576 / (numOfPartitions - spilledStatus.cardinality()))
+                    + "\n");
+        }
+        if (inMemoryTupleCount + spilledTupleCount > 0) {
+            buf.append("# of all tuples:\t" + (inMemoryTupleCount + spilledTupleCount) + "\tsize(MB):\t"
+                    + ((double) (inMemoryPartByteSize + spilledPartByteSize) / 1048576) + " ratio of spilled tuples:\t"
+                    + (spilledTupleCount / (inMemoryTupleCount + spilledTupleCount)) + "\n");
+        } else {
+            buf.append("# of all tuples:\t" + (inMemoryTupleCount + spilledTupleCount) + "\tsize(MB):\t"
+                    + ((double) (inMemoryPartByteSize + spilledPartByteSize) / 1048576) + " ratio of spilled tuples:\t"
+                    + "N/A" + "\n");
+        }
+        return buf.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index e308dd8..a72c0c6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -59,6 +59,10 @@ import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -70,10 +74,12 @@ import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
  *         partitions.
  *         - Operator overview:
  *         Assume we are trying to do (R Join S), with M buffers available, while we have an estimate on the size
- *         of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe, where in our implementation Probe phase
- *         can apply HHJ recursively, based on the value of M and size of R and S. HHJ phases proceed as follow:
+ *         of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe,
+ *         where in our implementation Probe phase can apply HHJ recursively, based on the value of M and size of
+ *         R and S. HHJ phases proceed as follow:
  *         BUILD:
- *         Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion].
+ *         Calculate number of partitions (Based on the size of R, fudge factor and M)
+ *         [See Shapiro's paper for the detailed discussion].
  *         Initialize the build phase (one frame per partition, all partitions considered resident at first)
  *         Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find
  *         its target partition and try to append it to that partition:
@@ -81,9 +87,9 @@ import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
  *         if no free buffer is available, find the largest resident partition and spill it. Using its freed
  *         buffers after spilling, allocate a new buffer for the target partition.
  *         Being done with R, close the build phase. (During closing we write the very last buffer of each
- *         spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers, belonging to
- *         spilled partitions as possible into memory, based on the free buffers - We will stop at the point where remaining free buffers is not enough
- *         for reloading an entire partition back into memory)
+ *         spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers,
+ *         belonging to spilled partitions as possible into memory, based on the free buffers - We will stop at the
+ *         point where remaining free buffers is not enough for reloading an entire partition back into memory)
  *         Create the hash table for the resident partitions (basically we create an in-memory hash join here)
  *         PROBE:
  *         Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer
@@ -112,7 +118,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
     private static final String PROBE_REL = "RelR";
     private static final String BUILD_REL = "RelS";
 
-    private final int frameLimit;
+    private final int memSizeInFrames;
     private final int inputsize0;
     private final double fudgeFactor;
     private final int[] probeKeys;
@@ -127,21 +133,20 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
     private final IMissingWriterFactory[] nonMatchWriterFactories;
 
     //Flags added for test purpose
-    private static boolean skipInMemoryHJ = false;
-    private static boolean forceNLJ = false;
-    private static boolean forceRR = false;
+    private boolean skipInMemoryHJ = false;
+    private boolean forceNLJ = false;
+    private boolean forceRoleReversal = false;
 
     private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
 
-    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int frameLimit, int inputsize0,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory01,
+    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSizeInFrames,
+            int inputsize0, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory01,
             ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory,
             boolean isLeftOuter, IMissingWriterFactory[] nonMatchWriterFactories) throws HyracksDataException {
-
         super(spec, 2, 1);
-        this.frameLimit = frameLimit;
+        this.memSizeInFrames = memSizeInFrames;
         this.inputsize0 = inputsize0;
         this.fudgeFactor = factor;
         this.probeKeys = keys0;
@@ -156,15 +161,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
         this.nonMatchWriterFactories = nonMatchWriterFactories;
     }
 
-    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int frameLimit, int inputsize0,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory01,
+    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSizeInFrames,
+            int inputsize0, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory01,
             ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory)
             throws HyracksDataException {
-        this(spec, frameLimit, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories, comparatorFactories,
-                recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10, predEvaluatorFactory, false,
-                null);
+        this(spec, memSizeInFrames, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories,
+                comparatorFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10,
+                predEvaluatorFactory, false, null);
     }
 
     @Override
@@ -190,19 +195,21 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
     private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
             throws HyracksDataException {
         int numberOfPartitions = 0;
-        if (memorySize <= 1) {
-            throw new HyracksDataException("not enough memory is available for Hybrid Hash Join");
+        if (memorySize <= 2) {
+            throw new HyracksDataException("Not enough memory is available for Hybrid Hash Join.");
         }
-        if (memorySize > buildSize) {
-            return 1; //We will switch to in-Mem HJ eventually
+        if (memorySize > buildSize * factor) {
+            // We will switch to in-Mem HJ eventually: create two big partitions.
+            // We set 2 (not 1) to avoid a corner case where the only partition may be spilled to the disk.
+            // This may happen since this formula doesn't consider the hash table size. If this is the case,
+            // we will do a nested loop join after some iterations. But, this is not effective.
+            return 2;
         }
         numberOfPartitions = (int) (Math.ceil((buildSize * factor / nPartitions - memorySize) / (memorySize - 1)));
-        if (numberOfPartitions <= 0) {
-            numberOfPartitions = 1; //becomes in-memory hash join
-        }
+        numberOfPartitions = Math.max(2, numberOfPartitions);
         if (numberOfPartitions > memorySize) {
             numberOfPartitions = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions));
-            return (numberOfPartitions < memorySize ? numberOfPartitions : memorySize);
+            return Math.max(2, Math.min(numberOfPartitions, memorySize));
         }
         return numberOfPartitions;
     }
@@ -276,10 +283,10 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 @Override
                 public void open() throws HyracksDataException {
-                    if (frameLimit <= 2) { //Dedicated buffers: One buffer to read and one buffer for output
-                        throw new HyracksDataException("not enough memory for Hybrid Hash Join");
+                    if (memSizeInFrames <= 2) { //Dedicated buffers: One buffer to read and two buffers for output
+                        throw new HyracksDataException("Not enough memory is assigend for Hybrid Hash Join.");
                     }
-                    state.memForJoin = frameLimit - 2;
+                    state.memForJoin = memSizeInFrames - 2;
                     state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
                             nPartitions);
                     state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
@@ -399,6 +406,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    state.hybridHJ.clearProbeTempFiles();
                     writer.fail();
                 }
 
@@ -426,6 +434,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
                             joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                         }
+
                     } finally {
                         writer.close();
                     }
@@ -442,8 +451,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
 
-                    long buildPartSize = buildSideReader.getFileSize() / ctx.getInitialFrameSize();
-                    long probePartSize = probeSideReader.getFileSize() / ctx.getInitialFrameSize();
+                    int frameSize = ctx.getInitialFrameSize();
+                    long buildPartSize = buildSideReader.getFileSize() / frameSize;
+                    long probePartSize = probeSideReader.getFileSize() / frameSize;
                     int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
 
                     if (LOGGER.isLoggable(Level.FINE)) {
@@ -453,12 +463,20 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                 + "  - LeftOuter is " + isLeftOuter);
                     }
 
+                    // Calculate the expected hash table size for the both side.
+                    long expectedHashTableSizeForBuildInFrame = SerializableHashTable
+                            .getExpectedTableFrameCount(buildSizeInTuple, frameSize);
+                    long expectedHashTableSizeForProbeInFrame = SerializableHashTable
+                            .getExpectedTableFrameCount(probeSizeInTuple, frameSize);
+
                     //Apply in-Mem HJ if possible
-                    if (!skipInMemoryHJ && ((buildPartSize < state.memForJoin)
-                            || (probePartSize < state.memForJoin && !isLeftOuter))) {
+                    if (!skipInMemoryHJ && ((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin)
+                            || (probePartSize + expectedHashTableSizeForProbeInFrame < state.memForJoin
+                                    && !isLeftOuter))) {
+
                         int tabSize = -1;
-                        if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) {
-                            //Case 1.1 - InMemHJ (wout Role-Reversal)
+                        if (!forceRoleReversal && (isLeftOuter || (buildPartSize < probePartSize))) {
+                            //Case 1.1 - InMemHJ (without Role-Reversal)
                             if (LOGGER.isLoggable(Level.FINE)) {
                                 LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
                                         + level + "]");
@@ -473,9 +491,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                     buildSideReader, probeSideReader); // checked-confirmed
                         } else { //Case 1.2 - InMemHJ with Role Reversal
                             if (LOGGER.isLoggable(Level.FINE)) {
-                                LOGGER.fine(
-                                        "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
-                                                + level + "]");
+                                LOGGER.fine("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ"
+                                        + "WITH RoleReversal - [Level " + level + "]");
                             }
                             tabSize = probeSizeInTuple;
                             if (tabSize == 0) {
@@ -492,8 +509,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
                         }
-                        if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) {
-                            //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                        if (!forceRoleReversal && (isLeftOuter || buildPartSize < probePartSize)) {
+                            //Case 2.1 - Recursive HHJ (without Role-Reversal)
                             if (LOGGER.isLoggable(Level.FINE)) {
                                 LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                         + level + "]");
@@ -543,7 +560,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     }
                     rHHj.closeBuild();
                     buildSideReader.close();
-
                     probeSideReader.open();
                     rHHj.initProbe();
                     rPartbuff.reset();
@@ -560,9 +576,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     BitSet rPStatus = rHHj.getPartitionStatus();
                     if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
                         if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine(
-                                    "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                            + level + "]");
+                            LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
+                                    + "(isLeftOuter || build<probe) - [Level " + level + "]");
                         }
                         for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                             RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
@@ -571,7 +586,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
 
                             if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
+                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
                                     appendNullToProbeTuples(rprfw);
                                 }
                                 continue;
@@ -587,15 +602,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     } else { //Case 2.1.2 - Switch to NLJ
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine(
-                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                            + level + "]");
+                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe)"
+                                            + " - [Level " + level + "]");
                         }
                         for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                             RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                             RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
                             if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
+                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
                                     appendNullToProbeTuples(rprfw);
                                 }
                                 continue;
@@ -605,9 +620,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
                             // NLJ order is outer + inner, the order is reversed from the other joins
                             if (isLeftOuter || probeSideInTups < buildSideInTups) {
-                                applyNestedLoopJoin(probeRd, buildRd, frameLimit, rprfw, rbrfw); //checked-modified
+                                applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); //checked-modified
                             } else {
-                                applyNestedLoopJoin(buildRd, probeRd, frameLimit, rbrfw, rprfw); //checked-modified
+                                applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); //checked-modified
                             }
                         }
                     }
@@ -642,17 +657,34 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
 
-                    ISerializableTable table = new SerializableHashTable(tabSize, ctx);
+                    IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
+                            state.memForJoin * ctx.getInitialFrameSize());
+                    ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
+
+                    ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc),
-                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), hpcRepBuild,
+                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild,
                             new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table,
-                            predEvaluator, isReversed);
+                            predEvaluator, isReversed, bufferManager);
 
                     bReader.open();
                     rPartbuff.reset();
                     while (bReader.nextFrame(rPartbuff)) {
-                        //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
-                        ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize());
+                        // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
+                        // in the InMemoryHashJoin.
+                        ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                        // If a frame cannot be allocated, there may be a chance if we can compact the table,
+                        // one or more frame may be reclaimed.
+                        if (copyBuffer == null) {
+                            if (joiner.compactHashTable() > 0) {
+                                copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                            }
+                            if (copyBuffer == null) {
+                                // Still no frame is allocated? At this point, we have no way to get a frame.
+                                throw new HyracksDataException(
+                                        "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                            }
+                        }
                         FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                         joiner.build(copyBuffer);
                         rPartbuff.reset();
@@ -672,7 +704,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                         RunFileReader outerReader, RunFileReader innerReader) throws HyracksDataException {
-                    // The nested loop join result is outer + inner. All the other operator is probe + build. Hence the reverse relation is different
+                    // The nested loop join result is outer + inner. All the other operator is probe + build.
+                    // Hence the reverse relation is different.
                     boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     ITuplePairComparator nljComptorOuterInner = isReversed ? nljComparatorBuild2Probe
@@ -716,6 +749,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
     }
 
     public void setForceRR(boolean b) {
-        forceRR = (!isLeftOuter && b);
+        forceRoleReversal = !isLeftOuter && b;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
index 8cd6792..d0e0616 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -18,17 +18,22 @@
  */
 package org.apache.hyracks.dataflow.std.structures;
 
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 
 public interface ISerializableTable {
 
-    void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
+    boolean insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
 
     void delete(int entry);
 
     boolean getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
 
-    int getFrameCount();
+    /**
+     * Returns the byte size of entire frames that are currently allocated to the table.
+     */
+    int getCurrentByteSize();
 
     int getTupleCount();
 
@@ -37,4 +42,26 @@ public interface ISerializableTable {
     void reset();
 
     void close();
+
+    boolean isGarbageCollectionNeeded();
+
+    /**
+     * Collects garbages in the given table, if any. For example, compacts the table by
+     * removing the garbage created by internal migration or lazy deletion operation.
+     * The desired result of this method is a compacted table without any garbage (no wasted space).
+     *
+     * @param bufferAccessor:
+     *            required to access the real tuple to calculate the original hash value
+     * @param tpc:
+     *            hash function
+     * @return the number of frames that are reclaimed.
+     * @throws HyracksDataException
+     */
+    int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc)
+            throws HyracksDataException;
+
+    /**
+     * Prints out the internal information of this table.
+     */
+    String printInfo();
 }


Mime
View raw message