asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [07/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject
Date Fri, 26 Feb 2016 05:54:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 58e1b29..ebcb462 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -19,9 +19,7 @@
 package org.apache.hyracks.dataflow.std.join;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -37,38 +35,42 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 /**
- * @author pouria
- *         This class mainly applies one level of HHJ on a pair of
- *         relations. It is always called by the descriptor.
+ * This class mainly applies one level of HHJ on a pair of
+ * relations. It is always called by the descriptor.
  */
 public class OptimizedHybridHashJoin {
 
-    private final int NO_MORE_FREE_BUFFER = -1;
-    private final int END_OF_PARTITION = -1;
-    private final int INVALID_BUFFER = -2;
-    private final int UNALLOCATED_FRAME = -3;
-    private final int BUFFER_FOR_RESIDENT_PARTS = -1;
+    // Used for special probe BigObject which can not be held into the Join memory
+    private FrameTupleAppender bigProbeFrameAppender;
+
+    enum SIDE {
+        BUILD,
+        PROBE
+    }
 
     private IHyracksTaskContext ctx;
 
-    private final String rel0Name;
-    private final String rel1Name;
+    private final String buildRelName;
+    private final String probeRelName;
 
     private final int[] buildKeys;
     private final int[] probeKeys;
 
     private final IBinaryComparator[] comparators;
 
-    private ITuplePartitionComputer buildHpc;
-    private ITuplePartitionComputer probeHpc;
+    private final ITuplePartitionComputer buildHpc;
+    private final ITuplePartitionComputer probeHpc;
 
     private final RecordDescriptor buildRd;
     private final RecordDescriptor probeRd;
@@ -78,83 +80,44 @@ public class OptimizedHybridHashJoin {
 
     private final IPredicateEvaluator predEvaluator;
     private final boolean isLeftOuter;
-    private final INullWriter[] nullWriters1;
-
-    private IFrame[] memBuffs; //Memory buffers for build
-    private int[] curPBuff; //Current (last) Buffer for each partition
-    private int[] nextBuff; //Next buffer in the partition's buffer chain
-    private int[] buildPSizeInTups; //Size of build partitions (in tuples)
-    private int[] probePSizeInTups; //Size of probe partitions (in tuples)
-    private int nextFreeBuffIx; //Index of next available free buffer to allocate/use
-    private BitSet pStatus; //0=resident, 1=spilled
-    private int numOfPartitions;
-    private int memForJoin;
+    private final INullWriter[] nullWriters;
+
+    private final BitSet spilledStatus; //0=resident, 1=spilled
+    private final int numOfPartitions;
+    private final int memForJoin;
     private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
 
+    private IPartitionedTupleBufferManager bufferManager;
+    private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
+
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAccessor accessorProbe;
-    private FrameTupleAppender buildTupAppender;
-    private FrameTupleAppender probeTupAppenderToResident;
-    private FrameTupleAppender probeTupAppenderToSpilled;
 
-    private int numOfSpilledParts;
-    private IFrame[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
-    private IFrame probeResBuff; //Buffer for probe resident partition tuples
-    private IFrame reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
-
-    private int[] buildPSizeInFrames; //Used for partition tuning
-    private int freeFramesCounter; //Used for partition tuning
-
-    private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
     private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
 
-    private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoin.class.getName());
-
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
-            String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-            IPredicateEvaluator predEval) {
-        this.ctx = ctx;
-        this.memForJoin = memForJoin;
-        this.buildRd = buildRd;
-        this.probeRd = probeRd;
-        this.buildHpc = buildHpc;
-        this.probeHpc = probeHpc;
-        this.buildKeys = keys1;
-        this.probeKeys = keys0;
-        this.comparators = comparators;
-        this.rel0Name = rel0Name;
-        this.rel1Name = rel1Name;
-
-        this.numOfPartitions = numOfPartitions;
-        this.buildRFWriters = new RunFileWriter[numOfPartitions];
-        this.probeRFWriters = new RunFileWriter[numOfPartitions];
-
-        this.accessorBuild = new FrameTupleAccessor(buildRd);
-        this.accessorProbe = new FrameTupleAccessor(probeRd);
-
-        this.predEvaluator = predEval;
-        this.isLeftOuter = false;
-        this.nullWriters1 = null;
-        this.isReversed = false;
-
-    }
-
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
-            String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-            IPredicateEvaluator predEval, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+    // stats information
+    private int[] buildPSizeInTups;
+    private IFrame reloadBuffer;
+    private TuplePointer tempPtr = new TuplePointer(); // this is a reusable object to store the pointer,which is not used anywhere.
+                                                       // we mainly use it to match the corresponding function signature.
+    private int[] probePSizeInTups;
+
+    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String probeRelName,
+            String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators,
+            RecordDescriptor probeRd, RecordDescriptor buildRd, ITuplePartitionComputer probeHpc,
+            ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) {
         this.ctx = ctx;
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
         this.buildHpc = buildHpc;
         this.probeHpc = probeHpc;
-        this.buildKeys = keys1;
-        this.probeKeys = keys0;
+        this.buildKeys = buildKeys;
+        this.probeKeys = probeKeys;
         this.comparators = comparators;
-        this.rel0Name = rel0Name;
-        this.rel1Name = rel1Name;
+        this.buildRelName = buildRelName;
+        this.probeRelName = probeRelName;
 
         this.numOfPartitions = numOfPartitions;
         this.buildRFWriters = new RunFileWriter[numOfPartitions];
@@ -167,49 +130,30 @@ public class OptimizedHybridHashJoin {
         this.isLeftOuter = isLeftOuter;
         this.isReversed = false;
 
-        this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+        this.spilledStatus = new BitSet(numOfPartitions);
+
+        this.nullWriters = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
             for (int i = 0; i < nullWriterFactories1.length; i++) {
-                nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                nullWriters[i] = nullWriterFactories1[i].createNullWriter();
             }
         }
     }
 
     public void initBuild() throws HyracksDataException {
-        memBuffs = new IFrame[memForJoin];
-        curPBuff = new int[numOfPartitions];
-        nextBuff = new int[memForJoin];
-        pStatus = new BitSet(numOfPartitions);
+        bufferManager = new VPartitionTupleBufferManager(ctx,
+                PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
+                numOfPartitions, memForJoin * ctx.getInitialFrameSize());
+        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus,
+                ctx.getInitialFrameSize());
+        spilledStatus.clear();
         buildPSizeInTups = new int[numOfPartitions];
-
-        buildPSizeInFrames = new int[numOfPartitions];
-        freeFramesCounter = memForJoin - numOfPartitions;
-
-        for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
-            memBuffs[i] = new VSizeFrame(ctx);
-            curPBuff[i] = i;
-            nextBuff[i] = -1;
-            buildPSizeInFrames[i] = 1; //The dedicated initial buffer
-        }
-
-        nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
-        for (int i = numOfPartitions; i < memBuffs.length; i++) {
-            nextBuff[i] = UNALLOCATED_FRAME;
-        }
-
-        buildTupAppender = new FrameTupleAppender();
-
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
 
-        boolean print = false;
-        if (print) {
-            accessorBuild.prettyPrint();
-        }
-
         for (int i = 0; i < tupleCount; ++i) {
             int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
             processTuple(i, pid);
@@ -219,304 +163,198 @@ public class OptimizedHybridHashJoin {
     }
 
     private void processTuple(int tid, int pid) throws HyracksDataException {
-        IFrame partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
-
-        if (!pStatus.get(pid)) { //resident partition
-            buildTupAppender.reset(partition, false);
-            while (true) {
-                if (buildTupAppender.append(accessorBuild, tid)) { //Tuple added to resident partition successfully
-                    break;
-                }
-                //partition does not have enough room
-                int newBuffIx = allocateFreeBuffer(pid);
-                if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
-                    int pidToSpill = selectPartitionToSpill();
-                    if (pidToSpill == -1) { //No more partition to spill
-                        throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
-                    }
-                    spillPartition(pidToSpill);
-                    buildTupAppender.reset(memBuffs[pidToSpill], true);
-                    processTuple(tid, pid);
-                    break;
-                } //New Buffer allocated successfully
-                partition = memBuffs[curPBuff[pid]]; //Current Buffer for the partition is now updated by allocateFreeBuffer() call above
-                buildTupAppender.reset(partition, true);
-                if (!buildTupAppender.append(accessorBuild, tid)) {
-                    throw new HyracksDataException("Invalid State (Can not append to newly allocated buffer)");
-                }
-                buildPSizeInFrames[pid]++;
-                break;
-            }
-        } else { //spilled partition
-            boolean needClear = false;
-            while (true) {
-                buildTupAppender.reset(partition, needClear);
-                if (buildTupAppender.append(accessorBuild, tid)) {
-                    break;
-                }
-                //Dedicated in-memory buffer for the partition is full, needed to be flushed first
-                buildWrite(pid, partition.getBuffer());
-                partition.reset();
-                needClear = true;
-                buildPSizeInFrames[pid]++;
-            }
-        }
-    }
-
-    private int allocateFreeBuffer(int pid) throws HyracksDataException {
-        if (nextFreeBuffIx != NO_MORE_FREE_BUFFER) {
-            if (memBuffs[nextFreeBuffIx] == null) {
-                memBuffs[nextFreeBuffIx] = new VSizeFrame(ctx);
-            }
-            int curPartBuffIx = curPBuff[pid];
-            curPBuff[pid] = nextFreeBuffIx;
-            int oldNext = nextBuff[nextFreeBuffIx];
-            nextBuff[nextFreeBuffIx] = curPartBuffIx;
-            if (oldNext == UNALLOCATED_FRAME) {
-                nextFreeBuffIx++;
-                if (nextFreeBuffIx == memForJoin) { //No more free buffer
-                    nextFreeBuffIx = NO_MORE_FREE_BUFFER;
-                }
-            } else {
-                nextFreeBuffIx = oldNext;
-            }
-            memBuffs[curPBuff[pid]].reset();
-
-            freeFramesCounter--;
-            return (curPBuff[pid]);
-        } else {
-            return NO_MORE_FREE_BUFFER; //A partitions needs to be spilled (if feasible)
+        while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+            selectAndSpillVictim(pid);
         }
     }
 
-    private int selectPartitionToSpill() {
-        int maxSize = -1;
-        int partitionToSpill = -1;
-        for (int i = 0; i < buildPSizeInTups.length; i++) { //Find the largest partition, to spill
-            if (!pStatus.get(i) && (buildPSizeInTups[i] > maxSize)) {
-                maxSize = buildPSizeInTups[i];
-                partitionToSpill = i;
-            }
+    private void selectAndSpillVictim(int pid) throws HyracksDataException {
+        int victimPartition = spillPolicy.selectVictimPartition(pid);
+        if (victimPartition < 0) {
+            throw new HyracksDataException(
+                    "No more space left in the memory buffer, please give join more memory budgets.");
         }
-        return partitionToSpill;
+        spillPartition(victimPartition);
     }
 
     private void spillPartition(int pid) throws HyracksDataException {
-        LOGGER.fine("OptimizedHybridHashJoin is spilling partition:" + pid + " with " + buildPSizeInFrames[pid]
-                + " frames for Thread ID " + Thread.currentThread().getId() + " (free frames: " + freeFramesCounter
-                + ").");
-        int curBuffIx = curPBuff[pid];
-        while (curBuffIx != END_OF_PARTITION) {
-            IFrame frame = memBuffs[curBuffIx];
-            buildWrite(pid, frame.getBuffer());
-            frame.reset();
-
-            int freedBuffIx = curBuffIx;
-            curBuffIx = nextBuff[curBuffIx];
-
-            if (freedBuffIx != pid) {
-                nextBuff[freedBuffIx] = nextFreeBuffIx;
-                nextFreeBuffIx = freedBuffIx;
-                freeFramesCounter++;
-            }
+        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+        bufferManager.flushPartition(pid, writer);
+        bufferManager.clearPartition(pid);
+        spilledStatus.set(pid);
+    }
+
+    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
+        RunFileWriter[] runFileWriters = null;
+        String refName = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriters = buildRFWriters;
+                refName = buildRelName;
+                break;
+            case PROBE:
+                refName = probeRelName;
+                runFileWriters = probeRFWriters;
+                break;
         }
-        curPBuff[pid] = pid;
-        pStatus.set(pid);
-        LOGGER.fine("OptimizedHybridHashJoin has freed " + freeFramesCounter + " frames by spilling partition:" + pid
-                + " for Thread ID " + Thread.currentThread().getId() + ".");
-    }
-
-    private void buildWrite(int pid, ByteBuffer buff) throws HyracksDataException {
-        RunFileWriter writer = buildRFWriters[pid];
+        RunFileWriter writer = runFileWriters[pid];
         if (writer == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(rel0Name);
-            LOGGER.fine("OptimizedHybridHashJoin is creating a run file (" + file.getFile().getAbsolutePath()
-                    + ") for partition:" + pid + " for Thread ID " + Thread.currentThread().getId() + ".");
+            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
             writer = new RunFileWriter(file, ctx.getIOManager());
             writer.open();
-            buildRFWriters[pid] = writer;
+            runFileWriters[pid] = writer;
         }
-        writer.nextFrame(buff);
+        return writer;
     }
 
     public void closeBuild() throws HyracksDataException {
-        for (int i = 0; i < numOfPartitions; i++) { //Remove Empty Partitions' allocated frame
-            if (buildPSizeInTups[i] == 0) {
-                buildPSizeInFrames[i]--;
-                nextBuff[curPBuff[i]] = nextFreeBuffIx;
-                nextFreeBuffIx = curPBuff[i];
-                curPBuff[i] = INVALID_BUFFER;
-                freeFramesCounter++;
-            }
+
+        closeAllSpilledPartitions(SIDE.BUILD);
+
+        bringBackSpilledPartitionIfHasMoreMemory(); //Trying to bring back as many spilled partitions as possible, making them resident
+
+        int inMemTupCount = 0;
+
+        for (int i = spilledStatus.nextClearBit(0); i >= 0
+                && i < numOfPartitions; i = spilledStatus.nextClearBit(i + 1)) {
+            inMemTupCount += buildPSizeInTups[i];
         }
 
-        ByteBuffer buff = null;
-        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
-            buff = memBuffs[i].getBuffer();
-            accessorBuild.reset(buff);
-            if (accessorBuild.getTupleCount() > 0) {
-                buildWrite(i, buff);
-                buildPSizeInFrames[i]++;
-            }
-            nextBuff[i] = nextFreeBuffIx;
-            nextFreeBuffIx = i;
-            freeFramesCounter++;
-            curPBuff[i] = INVALID_BUFFER;
+        createInMemoryJoiner(inMemTupCount);
+        cacheInMemJoin();
+    }
 
+    /**
+     * In case of failure happens, we need to clear up the generated temporary files.
+     */
+    public void clearBuildTempFiles() {
+        for (int i = 0; i < buildRFWriters.length; i++) {
             if (buildRFWriters[i] != null) {
-                buildRFWriters[i].close();
+                buildRFWriters[i].getFileReference().delete();
             }
         }
+    }
 
-        partitionTune(); //Trying to bring back as many spilled partitions as possible, making them resident
+    private void closeAllSpilledPartitions(SIDE whichSide) throws HyracksDataException {
+        RunFileWriter[] runFileWriters = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriters = buildRFWriters;
+                break;
+            case PROBE:
+                runFileWriters = probeRFWriters;
+                break;
+        }
 
-        int inMemTupCount = 0;
-        int inMemFrameCount = 0;
-        int spilledFrameCount = 0;
-        numOfSpilledParts = 0;
-
-        for (int i = 0; i < numOfPartitions; i++) {
-            if (!pStatus.get(i)) {
-                inMemTupCount += buildPSizeInTups[i];
-                inMemFrameCount += buildPSizeInFrames[i];
-            } else {
-                spilledFrameCount += buildPSizeInFrames[i];
-                numOfSpilledParts++;
+        for (int pid = spilledStatus.nextSetBit(0); pid >= 0; pid = spilledStatus.nextSetBit(pid + 1)) {
+            if (bufferManager.getNumTuples(pid) > 0) {
+                bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
+                bufferManager.clearPartition(pid);
+                runFileWriters[pid].close();
             }
         }
-
-        LOGGER.fine("OptimizedHybridHashJoin build phase has spilled " + numOfSpilledParts + " of " + numOfPartitions
-                + " partitions for Thread ID " + Thread.currentThread().getId() + ". (" + inMemFrameCount
-                + " in-memory frames, " + spilledFrameCount + " spilled frames)");
-        createInMemoryJoiner(inMemTupCount);
-        cacheInMemJoin();
-        this.isTableEmpty = (inMemTupCount == 0);
     }
 
-    private void partitionTune() throws HyracksDataException {
-        reloadBuffer = new VSizeFrame(ctx);
-        ArrayList<Integer> reloadSet = selectPartitionsToReload();
-        for (int i = 0; i < reloadSet.size(); i++) {
-            int pid = reloadSet.get(i);
-            int[] buffsToLoad = new int[buildPSizeInFrames[pid]];
-            for (int j = 0; j < buffsToLoad.length; j++) {
-                buffsToLoad[j] = nextFreeBuffIx;
-                int oldNext = nextBuff[nextFreeBuffIx];
-                if (oldNext == UNALLOCATED_FRAME) {
-                    nextFreeBuffIx++;
-                    if (nextFreeBuffIx == memForJoin) { //No more free buffer
-                        nextFreeBuffIx = NO_MORE_FREE_BUFFER;
-                    }
-                } else {
-                    nextFreeBuffIx = oldNext;
-                }
+    private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
+        // we need number of |spilledPartitions| buffers to store the probe data
+        int freeSpace = (memForJoin - spilledStatus.cardinality()) * ctx.getInitialFrameSize();
+        for (int p = spilledStatus.nextClearBit(0); p >= 0
+                && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+            freeSpace -= bufferManager.getPhysicalSize(p);
+        }
 
+        int pid = 0;
+        while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
+            if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
+                return;
             }
-            curPBuff[pid] = buffsToLoad[0];
-            for (int k = 1; k < buffsToLoad.length; k++) {
-                nextBuff[buffsToLoad[k - 1]] = buffsToLoad[k];
-            }
-            loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
+            freeSpace -= bufferManager.getPhysicalSize(pid);
         }
-        reloadSet.clear();
     }
 
-    private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
+    private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
         RunFileReader r = wr.createDeleteOnCloseReader();
         r.open();
-        int counter = 0;
-        ByteBuffer mBuff = null;
-        reloadBuffer.reset();
+        if (reloadBuffer == null) {
+            reloadBuffer = new VSizeFrame(ctx);
+        }
         while (r.nextFrame(reloadBuffer)) {
-            if (memBuffs[buffs[counter]] == null) {
-                memBuffs[buffs[counter]] = new VSizeFrame(ctx);
+            accessorBuild.reset(reloadBuffer.getBuffer());
+            for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+                if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+                    // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
+                    bufferManager.clearPartition(pid);
+                    r.close();
+                    return false;
+                }
             }
-            memBuffs[buffs[counter]].ensureFrameSize(reloadBuffer.getFrameSize());
-            mBuff = memBuffs[buffs[counter]].getBuffer();
-            FrameUtils.copyAndFlip(reloadBuffer.getBuffer(), mBuff);
-            counter++;
-            reloadBuffer.reset();
         }
 
-        int curNext = nextBuff[buffs[buffs.length - 1]];
-        nextBuff[buffs[buffs.length - 1]] = END_OF_PARTITION;
-        nextFreeBuffIx = curNext;
-
         r.close();
-        pStatus.set(pid, false);
+        spilledStatus.set(pid, false);
         buildRFWriters[pid] = null;
+        return true;
     }
 
-    private ArrayList<Integer> selectPartitionsToReload() {
-        ArrayList<Integer> p = new ArrayList<Integer>();
-        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) {
-            if (buildPSizeInFrames[i] > 0 && (freeFramesCounter - buildPSizeInFrames[i] >= 0)) {
-                p.add(i);
-                freeFramesCounter -= buildPSizeInFrames[i];
-            }
-            if (freeFramesCounter < 1) { //No more free buffer available
-                return p;
+    private int selectPartitionsToReload(int freeSpace, int pid) {
+        for (int i = spilledStatus.nextSetBit(pid); i >= 0; i = spilledStatus.nextSetBit(i + 1)) {
+            assert buildRFWriters[i].getFileSize() > 0 : "How comes a spilled partition have size 0?";
+            if (freeSpace >= buildRFWriters[i].getFileSize()) {
+                return i;
             }
         }
-        return p;
+        return -1;
     }
 
     private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
         ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
         this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
-                        comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
+                new FrameTupleAccessor(buildRd), buildHpc,
+                new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nullWriters, table,
+                predEvaluator, isReversed);
     }
 
     private void cacheInMemJoin() throws HyracksDataException {
 
         for (int pid = 0; pid < numOfPartitions; pid++) {
-            if (!pStatus.get(pid)) {
-                int nextBuffIx = curPBuff[pid];
-                while (nextBuffIx > -1) { //It is not Invalid or End_Of_Partition
-                    inMemJoiner.build(memBuffs[nextBuffIx].getBuffer());
-                    nextBuffIx = nextBuff[nextBuffIx];
-                }
+            if (!spilledStatus.get(pid)) {
+                bufferManager.flushPartition(pid, new IFrameWriter() {
+                    @Override
+                    public void open() throws HyracksDataException {
+
+                    }
+
+                    @Override
+                    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                        inMemJoiner.build(buffer);
+                    }
+
+                    @Override
+                    public void fail() throws HyracksDataException {
+
+                    }
+
+                    @Override
+                    public void close() throws HyracksDataException {
+
+                    }
+                });
             }
         }
     }
 
     public void initProbe() throws HyracksDataException {
 
-        sPartBuffs = new IFrame[numOfSpilledParts];
-        for (int i = 0; i < numOfSpilledParts; i++) {
-            sPartBuffs[i] = new VSizeFrame(ctx);
-        }
-        curPBuff = new int[numOfPartitions];
-        int nextBuffIxToAlloc = 0;
-        /* We only need to allocate one frame per spilled partition.
-         * Resident partitions do not need frames in probe, as their tuples join
-         * immediately with the resident build tuples using the inMemoryHashJoin */
-        for (int i = 0; i < numOfPartitions; i++) {
-            curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
-        }
         probePSizeInTups = new int[numOfPartitions];
         probeRFWriters = new RunFileWriter[numOfPartitions];
 
-        probeResBuff = new VSizeFrame(ctx);
-
-        probeTupAppenderToResident = new FrameTupleAppender();
-        probeTupAppenderToResident.reset(probeResBuff, true);
-
-        probeTupAppenderToSpilled = new FrameTupleAppender();
-
     }
 
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
         accessorProbe.reset(buffer);
         int tupleCount = accessorProbe.getTupleCount();
 
-        boolean print = false;
-        if (print) {
-            accessorProbe.prettyPrint();
-        }
-
-        if (numOfSpilledParts == 0) {
+        if (isBuildRelAllInMemory()) {
             inMemJoiner.join(buffer, writer);
             return;
         }
@@ -524,64 +362,60 @@ public class OptimizedHybridHashJoin {
             int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
 
             if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
-                if (pStatus.get(pid)) { //pid is Spilled
-                    boolean needToClear = false;
-                    IFrame frame = sPartBuffs[curPBuff[pid]];
-                    while (true) {
-                        probeTupAppenderToSpilled.reset(frame, needToClear);
-                        if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
-                            break;
+                if (spilledStatus.get(pid)) { //pid is Spilled
+                    while (!bufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
+                        int victim = pid;
+                        if (bufferManager.getNumTuples(pid) == 0) { // current pid is empty, choose the biggest one
+                            victim = spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
                         }
-                        probeWrite(pid, frame.getBuffer());
-                        frame.reset();
-                        needToClear = true;
-                    }
-                } else { //pid is Resident
-                    while (true) {
-                        if (probeTupAppenderToResident.append(accessorProbe, i)) {
+                        if (victim < 0) { // current tuple is too big for all the free space
+                            flushBigProbeObjectToDisk(pid, accessorProbe, i);
                             break;
                         }
-                        inMemJoiner.join(probeResBuff.getBuffer(), writer);
-                        probeTupAppenderToResident.reset(probeResBuff, true);
+                        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
+                        bufferManager.flushPartition(victim, runFileWriter);
+                        bufferManager.clearPartition(victim);
                     }
-
+                } else { //pid is Resident
+                    inMemJoiner.join(accessorProbe, i, writer);
                 }
                 probePSizeInTups[pid]++;
             }
-
         }
 
     }
 
-    public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
-        inMemJoiner.join(probeResBuff.getBuffer(), writer);
-        inMemJoiner.closeJoin(writer);
-
-        for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
-            ByteBuffer buff = sPartBuffs[curPBuff[pid]].getBuffer();
-            accessorProbe.reset(buff);
-            if (accessorProbe.getTupleCount() > 0) {
-                probeWrite(pid, buff);
-            }
-            closeProbeWriter(pid);
+    private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
+            throws HyracksDataException {
+        if (bigProbeFrameAppender == null) {
+            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+        }
+        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+        if (!bigProbeFrameAppender.append(accessorProbe, i)) {
+            throw new HyracksDataException("The given tuple is too big");
         }
+        bigProbeFrameAppender.write(runFileWriter, true);
     }
 
-    private void probeWrite(int pid, ByteBuffer buff) throws HyracksDataException {
-        RunFileWriter pWriter = probeRFWriters[pid];
-        if (pWriter == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(rel1Name);
-            pWriter = new RunFileWriter(file, ctx.getIOManager());
-            pWriter.open();
-            probeRFWriters[pid] = pWriter;
-        }
-        pWriter.nextFrame(buff);
+    private boolean isBuildRelAllInMemory() {
+        return spilledStatus.nextSetBit(0) < 0;
+    }
+
+    public void closeProbe(IFrameWriter writer) throws HyracksDataException {
+        //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+        inMemJoiner.closeJoin(writer);
+        closeAllSpilledPartitions(SIDE.PROBE);
+        bufferManager = null;
     }
 
-    private void closeProbeWriter(int pid) throws HyracksDataException {
-        RunFileWriter writer = probeRFWriters[pid];
-        if (writer != null) {
-            writer.close();
+    /**
+     * In case of failure happens, we need to clear up the generated temporary files.
+     */
+    public void clearProbeTempFiles() {
+        for (int i = 0; i < probeRFWriters.length; i++) {
+            if (probeRFWriters[i] != null) {
+                probeRFWriters[i].getFileReference().delete();
+            }
         }
     }
 
@@ -589,10 +423,6 @@ public class OptimizedHybridHashJoin {
         return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createDeleteOnCloseReader());
     }
 
-    public long getBuildPartitionSize(int pid) {
-        return ((buildRFWriters[pid] == null) ? 0 : buildRFWriters[pid].getFileSize());
-    }
-
     public int getBuildPartitionSizeInTup(int pid) {
         return (buildPSizeInTups[pid]);
     }
@@ -601,10 +431,6 @@ public class OptimizedHybridHashJoin {
         return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createDeleteOnCloseReader());
     }
 
-    public long getProbePartitionSize(int pid) {
-        return ((probeRFWriters[pid] == null) ? 0 : probeRFWriters[pid].getFileSize());
-    }
-
     public int getProbePartitionSizeInTup(int pid) {
         return (probePSizeInTups[pid]);
     }
@@ -630,37 +456,7 @@ public class OptimizedHybridHashJoin {
     }
 
     public BitSet getPartitionStatus() {
-        return pStatus;
-    }
-
-    public String debugGetStats() {
-        int numOfResidentPartitions = 0;
-        int numOfSpilledPartitions = 0;
-        double sumOfBuildSpilledSizes = 0;
-        double sumOfProbeSpilledSizes = 0;
-        int numOfInMemTups = 0;
-        for (int i = 0; i < numOfPartitions; i++) {
-            if (pStatus.get(i)) { //Spilled
-                numOfSpilledPartitions++;
-                sumOfBuildSpilledSizes += buildPSizeInTups[i];
-                sumOfProbeSpilledSizes += probePSizeInTups[i];
-            } else { //Resident
-                numOfResidentPartitions++;
-                numOfInMemTups += buildPSizeInTups[i];
-            }
-        }
-
-        double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
-        double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
-        String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
-                + numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
-                + avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
-                + freeFramesCounter;
-        return s;
-    }
-
-    public boolean isTableEmpty() {
-        return this.isTableEmpty;
+        return spilledStatus;
     }
 
     public void setIsReversed(boolean b) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 07fdde1..22ad91f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrame;
@@ -47,8 +48,9 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -59,6 +61,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePu
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 /**
  * @author pouria
@@ -109,15 +112,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
     private static final String PROBE_REL = "RelR";
     private static final String BUILD_REL = "RelS";
 
-    private final int memsize;
+    private final int frameLimit;
     private final int inputsize0;
     private final double fudgeFactor;
     private final int[] probeKeys;
     private final int[] buildKeys;
     private final IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories;
     private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
-    private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
-    private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
+    private final ITuplePairComparatorFactory tuplePairComparatorFactoryProbe2Build; //For NLJ in probe
+    private final ITuplePairComparatorFactory tuplePairComparatorFactoryBuild2Probe; //For NLJ in probe
     private final IPredicateEvaluatorFactory predEvaluatorFactory;
 
     private final boolean isLeftOuter;
@@ -130,50 +133,38 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
     private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
 
-    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int frameLimit, int inputsize0,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory0,
-            ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory,
+            ITuplePairComparatorFactory tupPaircomparatorFactory01,
+            ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory,
             boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
 
         super(spec, 2, 1);
+        this.frameLimit = frameLimit;
         this.inputsize0 = inputsize0;
-        this.memsize = memsize;
         this.fudgeFactor = factor;
         this.probeKeys = keys0;
         this.buildKeys = keys1;
         this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
         this.comparatorFactories = comparatorFactories;
-        this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
-        this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
+        this.tuplePairComparatorFactoryProbe2Build = tupPaircomparatorFactory01;
+        this.tuplePairComparatorFactoryBuild2Probe = tupPaircomparatorFactory10;
         recordDescriptors[0] = recordDescriptor;
         this.predEvaluatorFactory = predEvaluatorFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
     }
 
-    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int frameLimit, int inputsize0,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory0,
-            ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
-                    throws HyracksDataException {
-
-        super(spec, 2, 1);
-        this.memsize = memsize;
-        this.inputsize0 = inputsize0;
-        this.fudgeFactor = factor;
-        this.probeKeys = keys0;
-        this.buildKeys = keys1;
-        this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
-        this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
-        this.predEvaluatorFactory = predEvaluatorFactory;
-        recordDescriptors[0] = recordDescriptor;
-        this.isLeftOuter = false;
-        this.nullWriterFactories1 = null;
+            ITuplePairComparatorFactory tupPaircomparatorFactory01,
+            ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory)
+            throws HyracksDataException {
+        this(spec, frameLimit, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories,
+                comparatorFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10,
+                predEvaluatorFactory, false, null);
     }
 
     @Override
@@ -262,8 +253,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
 
-            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
+            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
 
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; i++) {
@@ -281,28 +272,25 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                         hashFunctionGeneratorFactories).createPartitioner(0);
                 ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                         hashFunctionGeneratorFactories).createPartitioner(0);
+                boolean isFailed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    if (memsize <= 2) { //Dedicated buffers: One buffer to read and one buffer for output
+                    if (frameLimit <= 2) { //Dedicated buffers: One buffer to read and one buffer for output
                         throw new HyracksDataException("not enough memory for Hybrid Hash Join");
                     }
-                    state.memForJoin = memsize - 2;
+                    state.memForJoin = frameLimit - 2;
                     state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
                             nPartitions);
-                    if (!isLeftOuter) {
-                        state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                                PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                                buildHpc, predEvaluator);
-                    } else {
-                        state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                                PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                                buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1);
-                    }
+                    state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                            PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+                            buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1);
 
                     state.hybridHJ.initBuild();
-                    LOGGER.fine("OptimizedHybridHashJoin is starting the build phase with " + state.numOfPartitions
-                            + " partitions using " + state.memForJoin + " frames for memory.");
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("OptimizedHybridHashJoin is starting the build phase with " + state.numOfPartitions
+                                + " partitions using " + state.memForJoin + " frames for memory.");
+                    }
                 }
 
                 @Override
@@ -313,12 +301,19 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 @Override
                 public void close() throws HyracksDataException {
                     state.hybridHJ.closeBuild();
-                    ctx.setStateObject(state);
-                    LOGGER.fine("OptimizedHybridHashJoin closed its build phase");
+                    if (isFailed){
+                        state.hybridHJ.clearBuildTempFiles();
+                    } else {
+                        ctx.setStateObject(state);
+                        if (LOGGER.isLoggable(Level.FINE)) {
+                            LOGGER.fine("OptimizedHybridHashJoin closed its build phase");
+                        }
+                    }
                 }
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    isFailed = true;
                 }
 
             };
@@ -350,22 +345,30 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
                         throws HyracksDataException {
 
-            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
-            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
+            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
-            final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
-                    : predEvaluatorFactory.createPredicateEvaluator());
+            final ITuplePairComparator nljComparatorProbe2Build = tuplePairComparatorFactoryProbe2Build
+                    .createTuplePairComparator(ctx);
+            final ITuplePairComparator nljComparatorBuild2Probe = tuplePairComparatorFactoryBuild2Probe
+                    .createTuplePairComparator(ctx);
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+                    .createPredicateEvaluator());
 
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
             final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            final ArrayTupleBuilder nullTupleBuild = isLeftOuter ?
+                    new ArrayTupleBuilder(buildRd.getFieldCount()) :
+                    null;
             if (isLeftOuter) {
+                DataOutput out = nullTupleBuild.getDataOutput();
                 for (int i = 0; i < nullWriterFactories1.length; i++) {
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                    nullWriters1[i].writeNull(out);
+                    nullTupleBuild.addFieldEndOffset();
                 }
             }
 
@@ -373,14 +376,20 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 private BuildAndPartitionTaskState state;
                 private IFrame rPartbuff = new VSizeFrame(ctx);
 
+                private FrameTupleAppender nullResultAppender = null;
+                private FrameTupleAccessor probeTupleAccessor;
+
                 @Override
                 public void open() throws HyracksDataException {
-                    writer.open();
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
+                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
+                    writer.open();
                     state.hybridHJ.initProbe();
-                    LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
+
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
+                    }
                 }
 
                 @Override
@@ -397,239 +406,260 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 public void close() throws HyracksDataException {
                     try {
                         state.hybridHJ.closeProbe(writer);
+
                         BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
+
                         rPartbuff.reset();
-                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
-                                .nextSetBit(pid + 1)) {
+                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
+
                             RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
                             RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
-                            if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+
+                            if (bReader == null || pReader == null) {
+                                if (isLeftOuter && pReader != null) {
+                                    appendNullToProbeTuples(pReader);
+                                }
                                 continue;
                             }
                             int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
                             int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
-                            int beforeMax = (bSize > pSize) ? bSize : pSize;
-                            joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
+                            joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                         }
                     } finally {
                         writer.close();
                     }
-                    LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
+                    }
                 }
 
-                private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
-                        RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed)
-                                throws HyracksDataException {
+                //The buildSideReader should be always the original buildSideReader, so should the probeSideReader
+                private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader,
+                        int buildSizeInTuple, int probeSizeInTuple, int level)
+                        throws HyracksDataException {
                     ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
 
-                    long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize())
-                            : (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
-                    long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize())
-                            : (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize());
-
-                    LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
-                            + pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
-                            + buildPartSize + "\tProbeSize:\t" + probePartSize + " - MemForJoin " + (state.memForJoin)
-                            + "  - LeftOuter is " + isLeftOuter);
+                    long buildPartSize = buildSideReader.getFileSize() / ctx.getInitialFrameSize();
+                    long probePartSize = probeSideReader.getFileSize() / ctx.getInitialFrameSize();
+                    int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
+
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine(
+                                "\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
+                                        + ") - (level " + level + ")"
+                                        + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize
+                                        + " - MemForJoin "
+                                        + (state.memForJoin)
+                                        + "  - LeftOuter is " + isLeftOuter);
+                    }
 
                     //Apply in-Mem HJ if possible
-                    if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
-                            || (probePartSize < state.memForJoin && !isLeftOuter)) {
+                    if (!skipInMemoryHJ && ((buildPartSize < state.memForJoin)
+                            || (probePartSize < state.memForJoin && !isLeftOuter))) {
                         int tabSize = -1;
-                        if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
-                            LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
-                                    + level + "]");
-                            tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid)
-                                    : ohhj.getBuildPartitionSizeInTup(pid);
+                        if (!forceRR && (isLeftOuter || (buildPartSize
+                                < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
+                                        + level + "]");
+                            }
+                            tabSize = buildSizeInTuple;
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
                             //Build Side is smaller
-                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc,
-                                    buildSideReader, probeSideReader, false, pid); // checked-confirmed
+                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc,
+                                    probeHpc, buildSideReader, probeSideReader); // checked-confirmed
                         } else { //Case 1.2 - InMemHJ with Role Reversal
-                            LOGGER.fine(
-                                    "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
-                                            + level + "]");
-                            tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid)
-                                    : ohhj.getProbePartitionSizeInTup(pid);
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine(
+                                        "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
+                                                + level + "]");
+                            }
+                            tabSize = probeSizeInTuple;
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
                             //Probe Side is smaller
-                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, buildHpc, probeHpc,
-                                    probeSideReader, buildSideReader, true, pid); // checked-confirmed
+                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc,
+                                    buildHpc, probeSideReader, buildSideReader); // checked-confirmed
                         }
                     }
                     //Apply (Recursive) HHJ
                     else {
-                        LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
-                        OptimizedHybridHashJoin rHHj;
-                        if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
-                            LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                    + level + "]");
-                            int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
-                                    nPartitions);
-                            rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
-                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc,
-                                    predEvaluator); //checked-confirmed
-
-                            buildSideReader.open();
-                            rHHj.initBuild();
-                            rPartbuff.reset();
-                            while (buildSideReader.nextFrame(rPartbuff)) {
-                                rHHj.build(rPartbuff.getBuffer());
+                        if (LOGGER.isLoggable(Level.FINE)) {
+                            LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
+                        }
+                        if (!forceRR && (isLeftOuter
+                                || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                        + level + "]");
                             }
+                            applyHybridHashJoin((int) buildPartSize, PROBE_REL, BUILD_REL, probeKeys, buildKeys,
+                                    probeRd, buildRd, probeHpc, buildHpc, probeSideReader, buildSideReader, level,
+                                    beforeMax);
 
-                            rHHj.closeBuild();
-
-                            probeSideReader.open();
-                            rHHj.initProbe();
-                            rPartbuff.reset();
-                            while (probeSideReader.nextFrame(rPartbuff)) {
-                                rHHj.probe(rPartbuff.getBuffer(), writer);
+                        } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine(
+                                        "\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level " + level + "]");
                             }
-                            rHHj.closeProbe(writer);
 
-                            int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
-                            int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
-                            int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
-                                    : maxAfterProbeSize;
+                            applyHybridHashJoin((int) probePartSize, BUILD_REL, PROBE_REL, buildKeys, probeKeys,
+                                    buildRd, probeRd, buildHpc, probeHpc, buildSideReader, probeSideReader, level,
+                                    beforeMax);
 
-                            BitSet rPStatus = rHHj.getPartitionStatus();
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
-                                LOGGER.fine(
-                                        "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                                + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
-                                        .nextSetBit(rPid + 1)) {
-                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+                        }
+                    }
+                }
 
-                                    if (rbrfw == null || rprfw == null) {
-                                        continue;
-                                    }
+                private void applyHybridHashJoin(int tableSize,
+                        final String PROBE_REL, final String BUILD_REL,
+                        final int[] probeKeys, final int[] buildKeys,
+                        final RecordDescriptor probeRd, final RecordDescriptor buildRd,
+                        final ITuplePartitionComputer probeHpc, final ITuplePartitionComputer buildHpc,
+                        RunFileReader probeSideReader, RunFileReader buildSideReader,
+                        final int level, final long beforeMax)
+                        throws HyracksDataException {
 
-                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); // checked-confirmed
-                                }
+                    boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
+                            && buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
 
-                            } else { //Case 2.1.2 - Switch to NLJ
-                                LOGGER.fine(
-                                        "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                                + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
-                                        .nextSetBit(rPid + 1)) {
-                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
-                                    if (rbrfw == null || rprfw == null) {
-                                        continue;
-                                    }
-
-                                    int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
-                                    int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
-                                    if (isLeftOuter || buildSideInTups < probeSideInTups) {
-                                        applyNestedLoopJoin(buildRd, probeRd, memsize, rprfw, rbrfw, nljComparator0,
-                                                false); //checked-modified
-                                    } else {
-                                        applyNestedLoopJoin(probeRd, buildRd, memsize, rbrfw, rprfw, nljComparator1,
-                                                true); //checked-modified
-                                    }
+                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
+
+                    OptimizedHybridHashJoin rHHj;
+                    int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor,
+                            nPartitions);
+                    rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
+                            probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc,
+                            predEvaluator, isLeftOuter, nullWriterFactories1); //checked-confirmed
+
+                    rHHj.setIsReversed(isReversed);
+                    buildSideReader.open();
+                    rHHj.initBuild();
+                    rPartbuff.reset();
+                    while (buildSideReader.nextFrame(rPartbuff)) {
+                        rHHj.build(rPartbuff.getBuffer());
+                    }
+                    rHHj.closeBuild();
+                    buildSideReader.close();
+
+                    probeSideReader.open();
+                    rHHj.initProbe();
+                    rPartbuff.reset();
+                    while (probeSideReader.nextFrame(rPartbuff)) {
+                        rHHj.probe(rPartbuff.getBuffer(), writer);
+                    }
+                    rHHj.closeProbe(writer);
+                    probeSideReader.close();
+
+                    int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+                    int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+                    int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
+
+                    BitSet rPStatus = rHHj.getPartitionStatus();
+                    if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
+                            * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+                        if (LOGGER.isLoggable(Level.FINE)) {
+                            LOGGER.fine(
+                                    "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                            + level + "]");
+                        }
+                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+                            int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
+                            int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
+
+                            if (rbrfw == null || rprfw == null) {
+                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
+                                    appendNullToProbeTuples(rprfw);
                                 }
+                                continue;
                             }
-                        } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
-                            LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level " + level + "]");
-                            int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
-                                    nPartitions);
-
-                            rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
-                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc,
-                                    predEvaluator); //checked-confirmed
-                            rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
-
-                            probeSideReader.open();
-                            rHHj.initBuild();
-                            rPartbuff.reset();
-                            while (probeSideReader.nextFrame(rPartbuff)) {
-                                rHHj.build(rPartbuff.getBuffer());
-                            }
-                            rHHj.closeBuild();
-                            rHHj.initProbe();
-                            buildSideReader.open();
-                            rPartbuff.reset();
-                            while (buildSideReader.nextFrame(rPartbuff)) {
-                                rHHj.probe(rPartbuff.getBuffer(), writer);
+
+                            if (isReversed) {
+                                joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
+                            } else {
+                                joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
                             }
-                            rHHj.closeProbe(writer);
-                            int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
-                            int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
-                            int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
-                                    : maxAfterProbeSize;
-                            BitSet rPStatus = rHHj.getPartitionStatus();
-
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
-                                LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
-                                        + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
-                                        .nextSetBit(rPid + 1)) {
-                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+                        }
 
-                                    if (rbrfw == null || rprfw == null) {
-                                        continue;
-                                    }
+                    } else { //Case 2.1.2 - Switch to NLJ
+                        if (LOGGER.isLoggable(Level.FINE)) {
+                            LOGGER.fine(
+                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                            + level + "]");
+                        }
+                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
-                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
-                                }
-                            } else { //Case 2.2.2 - Switch to NLJ
-                                LOGGER.fine(
-                                        "\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
-                                                + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
-                                        .nextSetBit(rPid + 1)) {
-                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
-                                    if (rbrfw == null || rprfw == null) {
-                                        continue;
-                                    }
-
-                                    long buildSideSize = rbrfw.getFileSize();
-                                    long probeSideSize = rprfw.getFileSize();
-                                    if (buildSideSize > probeSideSize) {
-                                        applyNestedLoopJoin(buildRd, probeRd, memsize, rbrfw, rprfw, nljComparator0,
-                                                true); //checked-modified
-                                    } else {
-                                        applyNestedLoopJoin(probeRd, buildRd, memsize, rprfw, rbrfw, nljComparator1,
-                                                true); //checked-modified
-                                    }
+                            if (rbrfw == null || rprfw == null) {
+                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
+                                    appendNullToProbeTuples(rprfw);
                                 }
+                                continue;
                             }
+
+                            int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
+                            int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+                            // NLJ order is outer + inner, the order is reversed from the other joins
+                            if (isLeftOuter || probeSideInTups < buildSideInTups) {
+                                applyNestedLoopJoin(probeRd, buildRd, frameLimit, rprfw, rbrfw); //checked-modified
+                            } else {
+                                applyNestedLoopJoin(buildRd, probeRd, frameLimit, rbrfw, rprfw); //checked-modified
+                            }
+                        }
+                    }
+                }
+
+                private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
+                    if (nullResultAppender == null) {
+                        nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+                    }
+                    if (probeTupleAccessor == null) {
+                        probeTupleAccessor = new FrameTupleAccessor(probeRd);
+                    }
+                    probReader.open();
+                    while (probReader.nextFrame(rPartbuff)) {
+                        probeTupleAccessor.reset(rPartbuff.getBuffer());
+                        for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
+                            FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
+                                    nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
+                                    nullTupleBuild.getSize());
                         }
-                        buildSideReader.close();
-                        probeSideReader.close();
                     }
+                    probReader.close();
+                    nullResultAppender.write(writer, true);
                 }
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
-                        RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
-                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader,
-                        boolean reverse, int pid) throws HyracksDataException {
+                        RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepBuild,
+                        ITuplePartitionComputer hpcRepProbe, RunFileReader bReader, RunFileReader pReader)
+                        throws HyracksDataException {
+                    boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
+                            && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
+
+                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
+
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc),
-                            hpcRepLarger, new FrameTupleAccessor(buildRDesc), hpcRepSmaller,
+                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), hpcRepBuild,
                             new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table,
-                            predEvaluator, reverse);
+                            predEvaluator, isReversed);
 
                     bReader.open();
                     rPartbuff.reset();
                     while (bReader.nextFrame(rPartbuff)) {
-                        ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+                        //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+                        ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize());
                         FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                         joiner.build(copyBuffer);
                         rPartbuff.reset();
@@ -647,12 +677,19 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 }
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
-                        RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
-                        boolean reverse) throws HyracksDataException {
-                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd),
-                            new FrameTupleAccessor(innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter,
-                            nullWriters1);
-                    nlj.setIsReversed(reverse);
+                        RunFileReader outerReader, RunFileReader innerReader)
+                        throws HyracksDataException {
+                    // The nested loop join result is outer + inner. All the other operator is probe + build. Hence the reverse relation is different
+                    boolean isReversed = outerRd == buildRd && innerRd == probeRd;
+                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
+                    ITuplePairComparator nljComptorOuterInner = isReversed ?
+                            nljComparatorBuild2Probe :
+                            nljComparatorProbe2Build;
+                    NestedLoopJoin nlj = new NestedLoopJoin(ctx,
+                            new FrameTupleAccessor(outerRd),
+                            new FrameTupleAccessor(innerRd), nljComptorOuterInner, memorySize,
+                            predEvaluator, isLeftOuter, nullWriters1);
+                    nlj.setIsReversed(isReversed);
 
                     IFrame cacheBuff = new VSizeFrame(ctx);
                     innerReader.open();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
new file mode 100644
index 0000000..7c7bfec
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IFrameFreeSlotPolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
+
+public abstract class AbstractExternalSortRunGenerator extends AbstractSortRunGenerator {
+
+    protected final IHyracksTaskContext ctx;
+    protected final IFrameSorter frameSorter;
+    protected final int maxSortFrames;
+
+    public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
+        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg,
+                EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+    }
+
+    public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
+                    throws HyracksDataException {
+        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+                Integer.MAX_VALUE);
+    }
+
+    public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
+                    throws HyracksDataException {
+        super();
+        this.ctx = ctx;
+        maxSortFrames = framesLimit - 1;
+
+        IFrameFreeSlotPolicy freeSlotPolicy = FrameFreeSlotPolicyFactory.createFreeSlotPolicy(policy, maxSortFrames);
+        IFrameBufferManager bufferManager = new VariableFrameMemoryManager(
+                new VariableFramePool(ctx, maxSortFrames * ctx.getInitialFrameSize()), freeSlotPolicy);
+        if (alg == Algorithm.MERGE_SORT) {
+            frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
+                    comparatorFactories, recordDesc, outputLimit);
+        } else {
+            frameSorter = new FrameSorterQuickSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
+                    comparatorFactories, recordDesc, outputLimit);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (!frameSorter.insertFrame(buffer)) {
+            flushFramesToRun();
+            if (!frameSorter.insertFrame(buffer)) {
+                throw new HyracksDataException("The given frame is too big to insert into the sorting memory.");
+            }
+        }
+    }
+
+    @Override
+    public ISorter getSorter() {
+        return frameSorter;
+    }
+
+}
\ No newline at end of file



Mime
View raw message