asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [1/2] asterixdb git commit: ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators.
Date Fri, 24 Feb 2017 05:33:37 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 452ec9f6f -> 34f2384ea


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 202aac6..16c21df 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -50,7 +50,6 @@ public class NestedLoopJoin {
     private final IFrame outBuffer;
     private final IFrame innerBuffer;
     private final VariableFrameMemoryManager outerBufferMngr;
-    private RunFileReader runFileReader;
     private final RunFileWriter runFileWriter;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder missingTupleBuilder;
@@ -103,14 +102,17 @@ public class NestedLoopJoin {
 
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
         if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
-            runFileReader = runFileWriter.createReader();
-            runFileReader.open();
-            while (runFileReader.nextFrame(innerBuffer)) {
-                for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                    blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+            RunFileReader runFileReader = runFileWriter.createReader();
+            try {
+                runFileReader.open();
+                while (runFileReader.nextFrame(innerBuffer)) {
+                    for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                        blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                    }
                 }
+            } finally {
+                runFileReader.close();
             }
-            runFileReader.close();
             outerBufferMngr.reset();
             if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
                 throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity()
@@ -174,20 +176,25 @@ public class NestedLoopJoin {
         }
     }
 
-    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        runFileReader = runFileWriter.createDeleteOnCloseReader();
-        runFileReader.open();
-        while (runFileReader.nextFrame(innerBuffer)) {
-            for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+    public void completeJoin(IFrameWriter writer) throws HyracksDataException {
+        RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader();
+        try {
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                    blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                }
             }
+        } finally {
+            runFileReader.close();
         }
-        runFileReader.close();
-        outerBufferMngr.reset();
-
         appender.write(writer, true);
     }
 
+    public void releaseMemory() throws HyracksDataException {
+        outerBufferMngr.reset();
+    }
+
     private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
             throws HyracksDataException {
         int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 09b7544..5d79f75 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -173,6 +173,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private JoinCacheTaskState state;
+                boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -188,8 +189,24 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void close() throws HyracksDataException {
+                    if (failed) {
+                        try {
+                            state.joiner.closeCache();
+                        } finally {
+                            writer.close();
+                        }
+                        return;
+                    }
                     try {
-                        state.joiner.closeJoin(writer);
+                        try {
+                            state.joiner.completeJoin(writer);
+                        } finally {
+                            state.joiner.releaseMemory();
+                        }
+                    } catch (Exception e) {
+                        state.joiner.closeCache();
+                        writer.fail();
+                        throw e;
                     } finally {
                         writer.close();
                     }
@@ -197,6 +214,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    failed = true;
                     writer.fail();
                 }
             };

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 17f009e..a5e2f6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -240,10 +240,10 @@ public class OptimizedHybridHashJoin {
     /**
      * In case of failure happens, we need to clear up the generated temporary files.
      */
-    public void clearBuildTempFiles() {
+    public void clearBuildTempFiles() throws HyracksDataException {
         for (int i = 0; i < buildRFWriters.length; i++) {
             if (buildRFWriters[i] != null) {
-                buildRFWriters[i].getFileReference().delete();
+                buildRFWriters[i].erase();
             }
         }
     }
@@ -258,17 +258,22 @@ public class OptimizedHybridHashJoin {
                 runFileWriters = probeRFWriters;
                 break;
         }
-
-        for (int pid = spilledStatus.nextSetBit(0); pid >= 0
-                && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
-            if (bufferManager.getNumTuples(pid) > 0) {
-                bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
-                bufferManager.clearPartition(pid);
+        try {
+            for (int pid = spilledStatus.nextSetBit(0); pid >= 0
+                    && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+                if (bufferManager.getNumTuples(pid) > 0) {
+                    bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
+                    bufferManager.clearPartition(pid);
+                }
             }
-            // It doesn't matter whether a spilled partition currently holds a tuple in memory or not.
-            // The file that holds the corresponding spilled partition needs to be closed.
-            if (runFileWriters[pid] != null) {
-                runFileWriters[pid].close();
+        } finally {
+            // Force to close all run file writers.
+            if (runFileWriters != null) {
+                for (RunFileWriter runFileWriter : runFileWriters) {
+                    if (runFileWriter != null) {
+                        runFileWriter.close();
+                    }
+                }
             }
         }
     }
@@ -418,26 +423,28 @@ public class OptimizedHybridHashJoin {
 
     private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException {
         RunFileReader r = wr.createReader();
-        r.open();
-        if (reloadBuffer == null) {
-            reloadBuffer = new VSizeFrame(ctx);
-        }
-        while (r.nextFrame(reloadBuffer)) {
-            accessorBuild.reset(reloadBuffer.getBuffer());
-            for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
-                if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+        try {
+            r.open();
+            if (reloadBuffer == null) {
+                reloadBuffer = new VSizeFrame(ctx);
+            }
+            while (r.nextFrame(reloadBuffer)) {
+                accessorBuild.reset(reloadBuffer.getBuffer());
+                for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+                    if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+                        continue;
+                    }
                     // for some reason (e.g. due to fragmentation) if the inserting failed,
                     // we need to clear the occupied frames
                     bufferManager.clearPartition(pid);
-                    r.close();
                     return false;
                 }
             }
+            // Closes and deletes the run file if it is already loaded into memory.
+            r.setDeleteAfterClose(true);
+        } finally {
+            r.close();
         }
-
-        // Closes and deletes the run file if it is already loaded into memory.
-        r.setDeleteAfterClose(true);
-        r.close();
         spilledStatus.set(pid, false);
         buildRFWriters[pid] = null;
         return true;
@@ -538,10 +545,13 @@ public class OptimizedHybridHashJoin {
         return spilledStatus.nextSetBit(0) < 0;
     }
 
-    public void closeProbe(IFrameWriter writer) throws HyracksDataException {
+    public void completeProbe(IFrameWriter writer) throws HyracksDataException {
         //We do NOT join the spilled partitions here, that decision is made at the descriptor level
         //(which join technique to use)
-        inMemJoiner.closeJoin(writer);
+        inMemJoiner.completeJoin(writer);
+    }
+
+    public void releaseResource() throws HyracksDataException {
         inMemJoiner.closeTable();
         closeAllSpilledPartitions(SIDE.PROBE);
         bufferManager.close();
@@ -553,10 +563,10 @@ public class OptimizedHybridHashJoin {
     /**
      * In case of failure happens, we need to clear up the generated temporary files.
      */
-    public void clearProbeTempFiles() {
+    public void clearProbeTempFiles() throws HyracksDataException {
         for (int i = 0; i < probeRFWriters.length; i++) {
             if (probeRFWriters[i] != null) {
-                probeRFWriters[i].getFileReference().delete();
+                probeRFWriters[i].erase();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index a72c0c6..d5e3568 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -385,6 +385,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 private FrameTupleAppender nullResultAppender = null;
                 private FrameTupleAccessor probeTupleAccessor;
+                private boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -406,21 +407,33 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 @Override
                 public void fail() throws HyracksDataException {
-                    state.hybridHJ.clearProbeTempFiles();
+                    failed = true;
                     writer.fail();
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
+                    if (failed) {
+                        try {
+                            // Clear temp files if fail() was called.
+                            state.hybridHJ.clearBuildTempFiles();
+                            state.hybridHJ.clearProbeTempFiles();
+                        } finally {
+                            writer.close(); // writer should always be closed.
+                        }
+                        logProbeComplete();
+                        return;
+                    }
                     try {
-                        state.hybridHJ.closeProbe(writer);
-
+                        try {
+                            state.hybridHJ.completeProbe(writer);
+                        } finally {
+                            state.hybridHJ.releaseResource();
+                        }
                         BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
-
                         rPartbuff.reset();
                         for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
                                 .nextSetBit(pid + 1)) {
-
                             RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
                             RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
 
@@ -434,10 +447,25 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
                             joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                         }
-
+                    } catch (Exception e) {
+                        // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail()
+                        // to send the failure signal to the downstream, when there is a throwable thrown.
+                        writer.fail();
+                        // Clear temp files as this.fail() nor this.close() will no longer be called after close().
+                        state.hybridHJ.clearBuildTempFiles();
+                        state.hybridHJ.clearProbeTempFiles();
+                        // Re-throw the whatever is caught.
+                        throw e;
                     } finally {
-                        writer.close();
+                        try {
+                            logProbeComplete();
+                        } finally {
+                            writer.close();
+                        }
                     }
+                }
+
+                private void logProbeComplete() {
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
                     }
@@ -542,9 +570,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                     boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                             && buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
-
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-
                     OptimizedHybridHashJoin rHHj;
                     int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
                     rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeKeys,
@@ -552,79 +578,107 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             nonMatchWriterFactories); //checked-confirmed
 
                     rHHj.setIsReversed(isReversed);
-                    buildSideReader.open();
-                    rHHj.initBuild();
-                    rPartbuff.reset();
-                    while (buildSideReader.nextFrame(rPartbuff)) {
-                        rHHj.build(rPartbuff.getBuffer());
+                    try {
+                        buildSideReader.open();
+                        try {
+                            rHHj.initBuild();
+                            rPartbuff.reset();
+                            while (buildSideReader.nextFrame(rPartbuff)) {
+                                rHHj.build(rPartbuff.getBuffer());
+                            }
+                        } finally {
+                            // Makes sure that files are always properly closed.
+                            rHHj.closeBuild();
+                        }
+                    } finally {
+                        buildSideReader.close();
                     }
-                    rHHj.closeBuild();
-                    buildSideReader.close();
-                    probeSideReader.open();
-                    rHHj.initProbe();
-                    rPartbuff.reset();
-                    while (probeSideReader.nextFrame(rPartbuff)) {
-                        rHHj.probe(rPartbuff.getBuffer(), writer);
+                    try {
+                        probeSideReader.open();
+                        rPartbuff.reset();
+                        try {
+                            rHHj.initProbe();
+                            while (probeSideReader.nextFrame(rPartbuff)) {
+                                rHHj.probe(rPartbuff.getBuffer(), writer);
+                            }
+                            rHHj.completeProbe(writer);
+                        } finally {
+                            rHHj.releaseResource();
+                        }
+                    } finally {
+                        // Makes sure that files are always properly closed.
+                        probeSideReader.close();
                     }
-                    rHHj.closeProbe(writer);
-                    probeSideReader.close();
 
-                    int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
-                    int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
-                    int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
+                    try {
+                        int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+                        int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+                        int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
 
-                    BitSet rPStatus = rHHj.getPartitionStatus();
-                    if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
-                        if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
-                                    + "(isLeftOuter || build<probe) - [Level " + level + "]");
-                        }
-                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
-                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-                            int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
-                            int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
-
-                            if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
-                                    appendNullToProbeTuples(rprfw);
-                                }
-                                continue;
+                        BitSet rPStatus = rHHj.getPartitionStatus();
+                        if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
+                            //Case 2.1.1 - Keep applying HHJ
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
+                                        + "(isLeftOuter || build<probe) - [Level " + level + "]");
                             }
+                            for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+                                int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
+                                int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
+
+                                if (rbrfw == null || rprfw == null) {
+                                    if (isLeftOuter && rprfw != null) {
+                                        // For the outer join, we don't reverse the role.
+                                        appendNullToProbeTuples(rprfw);
+                                    }
+                                    continue;
+                                }
 
-                            if (isReversed) {
-                                joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
-                            } else {
-                                joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
+                                if (isReversed) {
+                                    joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
+                                } else {
+                                    joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
+                                }
                             }
-                        }
-
-                    } else { //Case 2.1.2 - Switch to NLJ
-                        if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine(
-                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe)"
-                                            + " - [Level " + level + "]");
-                        }
-                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
-                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
-                            if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
-                                    appendNullToProbeTuples(rprfw);
-                                }
-                                continue;
+                        } else { //Case 2.1.2 - Switch to NLJ
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine(
+                                        "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH "
+                                                + "(isLeftOuter || build<probe) - [Level " + level + "]");
                             }
+                            for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+                                if (rbrfw == null || rprfw == null) {
+                                    if (isLeftOuter && rprfw != null) {
+                                        // For the outer join, we don't reverse the role.
+                                        appendNullToProbeTuples(rprfw);
+                                    }
+                                    continue;
+                                }
 
-                            int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
-                            int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
-                            // NLJ order is outer + inner, the order is reversed from the other joins
-                            if (isLeftOuter || probeSideInTups < buildSideInTups) {
-                                applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); //checked-modified
-                            } else {
-                                applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); //checked-modified
+                                int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
+                                int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+                                // NLJ order is outer + inner, the order is reversed from the other joins
+                                if (isLeftOuter || probeSideInTups < buildSideInTups) {
+                                    //checked-modified
+                                    applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw);
+                                } else {
+                                    //checked-modified
+                                    applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw);
+                                }
                             }
                         }
+                    } catch (Exception e) {
+                        // Make sure that temporary run files generated in recursive hybrid hash joins
+                        // are closed and deleted.
+                        rHHj.clearBuildTempFiles();
+                        rHHj.clearProbeTempFiles();
+                        throw e;
                     }
                 }
 
@@ -635,17 +689,20 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     if (probeTupleAccessor == null) {
                         probeTupleAccessor = new FrameTupleAccessor(probeRd);
                     }
-                    probReader.open();
-                    while (probReader.nextFrame(rPartbuff)) {
-                        probeTupleAccessor.reset(rPartbuff.getBuffer());
-                        for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
-                            FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
+                    try {
+                        probReader.open();
+                        while (probReader.nextFrame(rPartbuff)) {
+                            probeTupleAccessor.reset(rPartbuff.getBuffer());
+                            for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
+                                FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
                                     nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
                                     nullTupleBuild.getSize());
+                            }
                         }
+                        nullResultAppender.write(writer, true);
+                    } finally {
+                        probReader.close();
                     }
-                    probReader.close();
-                    nullResultAppender.write(writer, true);
                 }
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
@@ -654,9 +711,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                         throws HyracksDataException {
                     boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                             && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
-
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-
                     IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
                             state.memForJoin * ctx.getInitialFrameSize());
                     ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
@@ -667,39 +722,52 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table,
                             predEvaluator, isReversed, bufferManager);
 
-                    bReader.open();
-                    rPartbuff.reset();
-                    while (bReader.nextFrame(rPartbuff)) {
-                        // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
-                        // in the InMemoryHashJoin.
-                        ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
-                        // If a frame cannot be allocated, there may be a chance if we can compact the table,
-                        // one or more frame may be reclaimed.
-                        if (copyBuffer == null) {
-                            if (joiner.compactHashTable() > 0) {
-                                copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
-                            }
+                    try {
+                        bReader.open();
+                        rPartbuff.reset();
+                        while (bReader.nextFrame(rPartbuff)) {
+                            // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
+                            // in the InMemoryHashJoin.
+                            ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                            // If a frame cannot be allocated, there may be a chance if we can compact the table,
+                            // one or more frame may be reclaimed.
                             if (copyBuffer == null) {
-                                // Still no frame is allocated? At this point, we have no way to get a frame.
-                                throw new HyracksDataException(
-                                        "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                                if (joiner.compactHashTable() > 0) {
+                                    copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                                }
+                                if (copyBuffer == null) {
+                                    // Still no frame is allocated? At this point, we have no way to get a frame.
+                                    throw new HyracksDataException(
+                                            "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                                }
                             }
+                            FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
+                            joiner.build(copyBuffer);
+                            rPartbuff.reset();
                         }
-                        FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
-                        joiner.build(copyBuffer);
-                        rPartbuff.reset();
+                    } finally {
+                        bReader.close();
                     }
-                    bReader.close();
-                    rPartbuff.reset();
-                    //probe
-                    pReader.open();
-                    while (pReader.nextFrame(rPartbuff)) {
-                        joiner.join(rPartbuff.getBuffer(), writer);
+                    try {
+                        //probe
+                        pReader.open();
                         rPartbuff.reset();
+                        try {
+                            while (pReader.nextFrame(rPartbuff)) {
+                                joiner.join(rPartbuff.getBuffer(), writer);
+                                rPartbuff.reset();
+                            }
+                            joiner.completeJoin(writer);
+                        } finally {
+                            joiner.releaseMemory();
+                        }
+                    } finally {
+                        try {
+                            pReader.close();
+                        } finally {
+                            joiner.closeTable();
+                        }
                     }
-                    pReader.close();
-                    joiner.closeJoin(writer);
-                    joiner.closeTable();
                 }
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
@@ -716,40 +784,38 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     nlj.setIsReversed(isReversed);
 
                     IFrame cacheBuff = new VSizeFrame(ctx);
-                    innerReader.open();
-                    while (innerReader.nextFrame(cacheBuff)) {
-                        nlj.cache(cacheBuff.getBuffer());
-                        cacheBuff.reset();
+                    try {
+                        innerReader.open();
+                        while (innerReader.nextFrame(cacheBuff)) {
+                            nlj.cache(cacheBuff.getBuffer());
+                            cacheBuff.reset();
+                        }
+                    } finally {
+                        try {
+                            nlj.closeCache();
+                        } finally {
+                            innerReader.close();
+                        }
                     }
-                    nlj.closeCache();
-
-                    IFrame joinBuff = new VSizeFrame(ctx);
-                    outerReader.open();
-
-                    while (outerReader.nextFrame(joinBuff)) {
-                        nlj.join(joinBuff.getBuffer(), writer);
-                        joinBuff.reset();
+                    try {
+                        IFrame joinBuff = new VSizeFrame(ctx);
+                        outerReader.open();
+                        try {
+                            while (outerReader.nextFrame(joinBuff)) {
+                                nlj.join(joinBuff.getBuffer(), writer);
+                                joinBuff.reset();
+                            }
+                            nlj.completeJoin(writer);
+                        } finally {
+                            nlj.releaseMemory();
+                        }
+                    } finally {
+                        outerReader.close();
                     }
-
-                    nlj.closeJoin(writer);
-                    outerReader.close();
-                    innerReader.close();
                 }
             };
             return op;
         }
     }
 
-    public void setSkipInMemHJ(boolean b) {
-        skipInMemoryHJ = b;
-    }
-
-    public void setForceNLJ(boolean b) {
-        forceNLJ = b;
-    }
-
-    public void setForceRR(boolean b) {
-        forceRoleReversal = !isLeftOuter && b;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 4253114..e7da174 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -82,14 +82,12 @@ public class MaterializerTaskState extends AbstractStateObject {
                 while (in.nextFrame(frame)) {
                     writer.nextFrame(frame.getBuffer());
                 }
-            } catch (Throwable th) {
-                throw new HyracksDataException(th);
             } finally {
                 in.close();
             }
-        } catch (Throwable th) {
+        } catch (Exception e) {
             writer.fail();
-            throw new HyracksDataException(th);
+            throw e;
         } finally {
             writer.close();
             if (numConsumers.decrementAndGet() == 0) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 90b4b6c..b422ef4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -80,7 +80,8 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
         final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc);
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
-            IFrameWriter datasetPartitionWriter;
+            private IFrameWriter datasetPartitionWriter;
+            private boolean failed = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -110,15 +111,22 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 datasetPartitionWriter.fail();
             }
 
             @Override
             public void close() throws HyracksDataException {
-                if (frameOutputStream.getTupleCount() > 0) {
-                    frameOutputStream.flush(datasetPartitionWriter);
+                try {
+                    if (!failed && frameOutputStream.getTupleCount() > 0) {
+                        frameOutputStream.flush(datasetPartitionWriter);
+                    }
+                } catch (Exception e) {
+                    datasetPartitionWriter.fail();
+                    throw e;
+                } finally {
+                    datasetPartitionWriter.close();
                 }
-                datasetPartitionWriter.close();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 6d9d085..f4158ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -84,10 +84,13 @@ public abstract class AbstractExternalSortRunMerger {
                 finalWriter = prepareSkipMergingFinalResultWriter(writer);
                 finalWriter.open();
                 if (sorter != null) {
-                    if (sorter.hasRemaining()) {
-                        sorter.flush(finalWriter);
+                    try {
+                        if (sorter.hasRemaining()) {
+                            sorter.flush(finalWriter);
+                        }
+                    } finally {
+                        sorter.close();
                     }
-                    sorter.close();
                 }
             } else {
                 /** recycle sort buffer */
@@ -128,10 +131,15 @@ public abstract class AbstractExternalSortRunMerger {
                             RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
                             IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
 
-                            mergeResultWriter.open();
-                            merge(mergeResultWriter, partialRuns);
-                            mergeResultWriter.close();
-
+                            try {
+                                mergeResultWriter.open();
+                                merge(mergeResultWriter, partialRuns);
+                            } catch (Throwable t) {
+                                mergeResultWriter.fail();
+                                throw t;
+                            } finally {
+                                mergeResultWriter.close();
+                            }
                             reader = mergeFileWriter.createReader();
                         }
                         runs.add(reader);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
index cdabcda..ef9e4b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
@@ -103,7 +103,7 @@ class TestSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescrip
                     writer.nextFrame(frame);
                 } catch (Exception e) {
                     writer.fail();
-                    throw new HyracksDataException(e);
+                    throw e;
                 } finally {
                     writer.close();
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index bba18b3..683857f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -84,9 +84,9 @@ class DummySourceOperatorDescriptor extends AbstractSingleActivityOperatorDescri
             public void initialize() throws HyracksDataException {
                 try {
                     writer.open();
-                } catch (Throwable th) {
+                } catch (Exception e) {
                     writer.fail();
-                    throw new HyracksDataException(th);
+                    throw e;
                 } finally {
                     writer.close();
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
index f83ab6a..521dff1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -227,12 +227,14 @@ public abstract class AbstractExternalGroupbyTest {
 
         ResultValidateWriter writer = new ResultValidateWriter(keyValueMap);
 
-        getBuilder().open();
-        for (IFrame frame : input) {
-            getBuilder().nextFrame(frame.getBuffer());
+        try {
+            getBuilder().open();
+            for (IFrame frame : input) {
+                getBuilder().nextFrame(frame.getBuffer());
+            }
+        } finally {
+            getBuilder().close();
         }
-        getBuilder().close();
-
         getMerger().setOutputFrameWriter(0, writer, outputRec);
         getMerger().initialize();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index 673c6fa..cfd4f30 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -130,13 +130,16 @@ public abstract class AbstractRunGeneratorTest {
 
         assertTrue(runs.size() > 0);
         for (GeneratedRunFileReader run : runs) {
-            run.open();
-            int preKey = Integer.MIN_VALUE;
-            while (run.nextFrame(frame)) {
-                fta.reset(frame.getBuffer());
-                preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+            try {
+                run.open();
+                int preKey = Integer.MIN_VALUE;
+                while (run.nextFrame(frame)) {
+                    fta.reset(frame.getBuffer());
+                    preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+                }
+            } finally {
+                run.close();
             }
-            run.close();
         }
         assertTrue(keyValuePair.isEmpty());
     }


Mime
View raw message