Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 804C0200C36 for ; Fri, 24 Feb 2017 06:33:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7EBC5160B78; Fri, 24 Feb 2017 05:33:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C48FD160B64 for ; Fri, 24 Feb 2017 06:33:38 +0100 (CET) Received: (qmail 38246 invoked by uid 500); 24 Feb 2017 05:33:38 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 38228 invoked by uid 99); 24 Feb 2017 05:33:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Feb 2017 05:33:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB227DFDAC; Fri, 24 Feb 2017 05:33:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: buyingyi@apache.org To: commits@asterixdb.apache.org Date: Fri, 24 Feb 2017 05:33:37 -0000 Message-Id: <6f4f2f68cc934b0082bc3318782c7f3d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] asterixdb git commit: ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators. archived-at: Fri, 24 Feb 2017 05:33:40 -0000 Repository: asterixdb Updated Branches: refs/heads/master 452ec9f6f -> 34f2384ea http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java index 202aac6..16c21df 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java @@ -50,7 +50,6 @@ public class NestedLoopJoin { private final IFrame outBuffer; private final IFrame innerBuffer; private final VariableFrameMemoryManager outerBufferMngr; - private RunFileReader runFileReader; private final RunFileWriter runFileWriter; private final boolean isLeftOuter; private final ArrayTupleBuilder missingTupleBuilder; @@ -103,14 +102,17 @@ public class NestedLoopJoin { public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException { if (outerBufferMngr.insertFrame(outerBuffer) < 0) { - runFileReader = runFileWriter.createReader(); - runFileReader.open(); - while (runFileReader.nextFrame(innerBuffer)) { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + RunFileReader runFileReader = runFileWriter.createReader(); + try { + runFileReader.open(); + while (runFileReader.nextFrame(innerBuffer)) { + for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { + blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + } } + } finally { + runFileReader.close(); } - runFileReader.close(); outerBufferMngr.reset(); if (outerBufferMngr.insertFrame(outerBuffer) < 0) { throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity() @@ -174,20 +176,25 @@ public class NestedLoopJoin { } } - public void closeJoin(IFrameWriter writer) throws HyracksDataException { - runFileReader = runFileWriter.createDeleteOnCloseReader(); - runFileReader.open(); - while (runFileReader.nextFrame(innerBuffer)) { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + public void completeJoin(IFrameWriter writer) throws HyracksDataException { + RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader(); + try { + runFileReader.open(); + while (runFileReader.nextFrame(innerBuffer)) { + for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { + blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + } } + } finally { + runFileReader.close(); } - runFileReader.close(); - outerBufferMngr.reset(); - appender.write(writer, true); } + public void releaseMemory() throws HyracksDataException { + outerBufferMngr.reset(); + } + private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) throws HyracksDataException { int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java index 09b7544..5d79f75 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java @@ -173,6 +173,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { private JoinCacheTaskState state; + boolean failed = false; @Override public void open() throws HyracksDataException { @@ -188,8 +189,24 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public void close() throws HyracksDataException { + if (failed) { + try { + state.joiner.closeCache(); + } finally { + writer.close(); + } + return; + } try { - state.joiner.closeJoin(writer); + try { + state.joiner.completeJoin(writer); + } finally { + state.joiner.releaseMemory(); + } + } catch (Exception e) { + state.joiner.closeCache(); + writer.fail(); + throw e; } finally { writer.close(); } @@ -197,6 +214,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public void fail() throws HyracksDataException { + failed = true; writer.fail(); } }; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index 17f009e..a5e2f6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -240,10 +240,10 @@ public class OptimizedHybridHashJoin { /** * In case of failure happens, we need to clear up the generated temporary files. */ - public void clearBuildTempFiles() { + public void clearBuildTempFiles() throws HyracksDataException { for (int i = 0; i < buildRFWriters.length; i++) { if (buildRFWriters[i] != null) { - buildRFWriters[i].getFileReference().delete(); + buildRFWriters[i].erase(); } } } @@ -258,17 +258,22 @@ public class OptimizedHybridHashJoin { runFileWriters = probeRFWriters; break; } - - for (int pid = spilledStatus.nextSetBit(0); pid >= 0 - && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) { - if (bufferManager.getNumTuples(pid) > 0) { - bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide)); - bufferManager.clearPartition(pid); + try { + for (int pid = spilledStatus.nextSetBit(0); pid >= 0 + && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) { + if (bufferManager.getNumTuples(pid) > 0) { + bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide)); + bufferManager.clearPartition(pid); + } } - // It doesn't matter whether a spilled partition currently holds a tuple in memory or not. - // The file that holds the corresponding spilled partition needs to be closed. - if (runFileWriters[pid] != null) { - runFileWriters[pid].close(); + } finally { + // Force to close all run file writers. + if (runFileWriters != null) { + for (RunFileWriter runFileWriter : runFileWriters) { + if (runFileWriter != null) { + runFileWriter.close(); + } + } } } } @@ -418,26 +423,28 @@ public class OptimizedHybridHashJoin { private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException { RunFileReader r = wr.createReader(); - r.open(); - if (reloadBuffer == null) { - reloadBuffer = new VSizeFrame(ctx); - } - while (r.nextFrame(reloadBuffer)) { - accessorBuild.reset(reloadBuffer.getBuffer()); - for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) { - if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { + try { + r.open(); + if (reloadBuffer == null) { + reloadBuffer = new VSizeFrame(ctx); + } + while (r.nextFrame(reloadBuffer)) { + accessorBuild.reset(reloadBuffer.getBuffer()); + for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) { + if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { + continue; + } // for some reason (e.g. due to fragmentation) if the inserting failed, // we need to clear the occupied frames bufferManager.clearPartition(pid); - r.close(); return false; } } + // Closes and deletes the run file if it is already loaded into memory. + r.setDeleteAfterClose(true); + } finally { + r.close(); } - - // Closes and deletes the run file if it is already loaded into memory. - r.setDeleteAfterClose(true); - r.close(); spilledStatus.set(pid, false); buildRFWriters[pid] = null; return true; @@ -538,10 +545,13 @@ public class OptimizedHybridHashJoin { return spilledStatus.nextSetBit(0) < 0; } - public void closeProbe(IFrameWriter writer) throws HyracksDataException { + public void completeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level //(which join technique to use) - inMemJoiner.closeJoin(writer); + inMemJoiner.completeJoin(writer); + } + + public void releaseResource() throws HyracksDataException { inMemJoiner.closeTable(); closeAllSpilledPartitions(SIDE.PROBE); bufferManager.close(); @@ -553,10 +563,10 @@ public class OptimizedHybridHashJoin { /** * In case of failure happens, we need to clear up the generated temporary files. */ - public void clearProbeTempFiles() { + public void clearProbeTempFiles() throws HyracksDataException { for (int i = 0; i < probeRFWriters.length; i++) { if (probeRFWriters[i] != null) { - probeRFWriters[i].getFileReference().delete(); + probeRFWriters[i].erase(); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index a72c0c6..d5e3568 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -385,6 +385,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD private FrameTupleAppender nullResultAppender = null; private FrameTupleAccessor probeTupleAccessor; + private boolean failed = false; @Override public void open() throws HyracksDataException { @@ -406,21 +407,33 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public void fail() throws HyracksDataException { - state.hybridHJ.clearProbeTempFiles(); + failed = true; writer.fail(); } @Override public void close() throws HyracksDataException { + if (failed) { + try { + // Clear temp files if fail() was called. + state.hybridHJ.clearBuildTempFiles(); + state.hybridHJ.clearProbeTempFiles(); + } finally { + writer.close(); // writer should always be closed. + } + logProbeComplete(); + return; + } try { - state.hybridHJ.closeProbe(writer); - + try { + state.hybridHJ.completeProbe(writer); + } finally { + state.hybridHJ.releaseResource(); + } BitSet partitionStatus = state.hybridHJ.getPartitionStatus(); - rPartbuff.reset(); for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus .nextSetBit(pid + 1)) { - RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid); RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid); @@ -434,10 +447,25 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid); joinPartitionPair(bReader, pReader, bSize, pSize, 1); } - + } catch (Exception e) { + // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail() + // to send the failure signal to the downstream, when there is a throwable thrown. + writer.fail(); + // Clear temp files as this.fail() nor this.close() will no longer be called after close(). + state.hybridHJ.clearBuildTempFiles(); + state.hybridHJ.clearProbeTempFiles(); + // Re-throw the whatever is caught. + throw e; } finally { - writer.close(); + try { + logProbeComplete(); + } finally { + writer.close(); + } } + } + + private void logProbeComplete() { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("OptimizedHybridHashJoin closed its probe phase"); } @@ -542,9 +570,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys && buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys; - assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; - OptimizedHybridHashJoin rHHj; int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions); rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeKeys, @@ -552,79 +578,107 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD nonMatchWriterFactories); //checked-confirmed rHHj.setIsReversed(isReversed); - buildSideReader.open(); - rHHj.initBuild(); - rPartbuff.reset(); - while (buildSideReader.nextFrame(rPartbuff)) { - rHHj.build(rPartbuff.getBuffer()); + try { + buildSideReader.open(); + try { + rHHj.initBuild(); + rPartbuff.reset(); + while (buildSideReader.nextFrame(rPartbuff)) { + rHHj.build(rPartbuff.getBuffer()); + } + } finally { + // Makes sure that files are always properly closed. + rHHj.closeBuild(); + } + } finally { + buildSideReader.close(); } - rHHj.closeBuild(); - buildSideReader.close(); - probeSideReader.open(); - rHHj.initProbe(); - rPartbuff.reset(); - while (probeSideReader.nextFrame(rPartbuff)) { - rHHj.probe(rPartbuff.getBuffer(), writer); + try { + probeSideReader.open(); + rPartbuff.reset(); + try { + rHHj.initProbe(); + while (probeSideReader.nextFrame(rPartbuff)) { + rHHj.probe(rPartbuff.getBuffer(), writer); + } + rHHj.completeProbe(writer); + } finally { + rHHj.releaseResource(); + } + } finally { + // Makes sure that files are always properly closed. + probeSideReader.close(); } - rHHj.closeProbe(writer); - probeSideReader.close(); - int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize(); - int maxAfterProbeSize = rHHj.getMaxProbePartitionSize(); - int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize); + try { + int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize(); + int maxAfterProbeSize = rHHj.getMaxProbePartitionSize(); + int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize); - BitSet rPStatus = rHHj.getPartitionStatus(); - if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH " - + "(isLeftOuter || build= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { - RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); - RunFileReader rprfw = rHHj.getProbeRFReader(rPid); - int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid); - int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid); - - if (rbrfw == null || rprfw == null) { - if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role. - appendNullToProbeTuples(rprfw); - } - continue; + BitSet rPStatus = rHHj.getPartitionStatus(); + if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { + //Case 2.1.1 - Keep applying HHJ + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH " + + "(isLeftOuter || build= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { + RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); + RunFileReader rprfw = rHHj.getProbeRFReader(rPid); + int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid); + int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid); + + if (rbrfw == null || rprfw == null) { + if (isLeftOuter && rprfw != null) { + // For the outer join, we don't reverse the role. + appendNullToProbeTuples(rprfw); + } + continue; + } - if (isReversed) { - joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1); - } else { - joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1); + if (isReversed) { + joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1); + } else { + joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1); + } } - } - - } else { //Case 2.1.2 - Switch to NLJ - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine( - "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { - RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); - RunFileReader rprfw = rHHj.getProbeRFReader(rPid); - if (rbrfw == null || rprfw == null) { - if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role. - appendNullToProbeTuples(rprfw); - } - continue; + } else { //Case 2.1.2 - Switch to NLJ + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine( + "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH " + + "(isLeftOuter || build= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { + RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); + RunFileReader rprfw = rHHj.getProbeRFReader(rPid); + + if (rbrfw == null || rprfw == null) { + if (isLeftOuter && rprfw != null) { + // For the outer join, we don't reverse the role. + appendNullToProbeTuples(rprfw); + } + continue; + } - int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid); - int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid); - // NLJ order is outer + inner, the order is reversed from the other joins - if (isLeftOuter || probeSideInTups < buildSideInTups) { - applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); //checked-modified - } else { - applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); //checked-modified + int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid); + int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid); + // NLJ order is outer + inner, the order is reversed from the other joins + if (isLeftOuter || probeSideInTups < buildSideInTups) { + //checked-modified + applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); + } else { + //checked-modified + applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); + } } } + } catch (Exception e) { + // Make sure that temporary run files generated in recursive hybrid hash joins + // are closed and deleted. + rHHj.clearBuildTempFiles(); + rHHj.clearProbeTempFiles(); + throw e; } } @@ -635,17 +689,20 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD if (probeTupleAccessor == null) { probeTupleAccessor = new FrameTupleAccessor(probeRd); } - probReader.open(); - while (probReader.nextFrame(rPartbuff)) { - probeTupleAccessor.reset(rPartbuff.getBuffer()); - for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) { - FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid, + try { + probReader.open(); + while (probReader.nextFrame(rPartbuff)) { + probeTupleAccessor.reset(rPartbuff.getBuffer()); + for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) { + FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize()); + } } + nullResultAppender.write(writer, true); + } finally { + probReader.close(); } - probReader.close(); - nullResultAppender.write(writer, true); } private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc, @@ -654,9 +711,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD throws HyracksDataException { boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys; - assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; - IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize()); ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool); @@ -667,39 +722,52 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table, predEvaluator, isReversed, bufferManager); - bReader.open(); - rPartbuff.reset(); - while (bReader.nextFrame(rPartbuff)) { - // We need to allocate a copyBuffer, because this buffer gets added to the buffers list - // in the InMemoryHashJoin. - ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); - // If a frame cannot be allocated, there may be a chance if we can compact the table, - // one or more frame may be reclaimed. - if (copyBuffer == null) { - if (joiner.compactHashTable() > 0) { - copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); - } + try { + bReader.open(); + rPartbuff.reset(); + while (bReader.nextFrame(rPartbuff)) { + // We need to allocate a copyBuffer, because this buffer gets added to the buffers list + // in the InMemoryHashJoin. + ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); + // If a frame cannot be allocated, there may be a chance if we can compact the table, + // one or more frame may be reclaimed. if (copyBuffer == null) { - // Still no frame is allocated? At this point, we have no way to get a frame. - throw new HyracksDataException( - "Can't allocate one more frame. Assign more memory to InMemoryHashJoin."); + if (joiner.compactHashTable() > 0) { + copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); + } + if (copyBuffer == null) { + // Still no frame is allocated? At this point, we have no way to get a frame. + throw new HyracksDataException( + "Can't allocate one more frame. Assign more memory to InMemoryHashJoin."); + } } + FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer); + joiner.build(copyBuffer); + rPartbuff.reset(); } - FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer); - joiner.build(copyBuffer); - rPartbuff.reset(); + } finally { + bReader.close(); } - bReader.close(); - rPartbuff.reset(); - //probe - pReader.open(); - while (pReader.nextFrame(rPartbuff)) { - joiner.join(rPartbuff.getBuffer(), writer); + try { + //probe + pReader.open(); rPartbuff.reset(); + try { + while (pReader.nextFrame(rPartbuff)) { + joiner.join(rPartbuff.getBuffer(), writer); + rPartbuff.reset(); + } + joiner.completeJoin(writer); + } finally { + joiner.releaseMemory(); + } + } finally { + try { + pReader.close(); + } finally { + joiner.closeTable(); + } } - pReader.close(); - joiner.closeJoin(writer); - joiner.closeTable(); } private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize, @@ -716,40 +784,38 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD nlj.setIsReversed(isReversed); IFrame cacheBuff = new VSizeFrame(ctx); - innerReader.open(); - while (innerReader.nextFrame(cacheBuff)) { - nlj.cache(cacheBuff.getBuffer()); - cacheBuff.reset(); + try { + innerReader.open(); + while (innerReader.nextFrame(cacheBuff)) { + nlj.cache(cacheBuff.getBuffer()); + cacheBuff.reset(); + } + } finally { + try { + nlj.closeCache(); + } finally { + innerReader.close(); + } } - nlj.closeCache(); - - IFrame joinBuff = new VSizeFrame(ctx); - outerReader.open(); - - while (outerReader.nextFrame(joinBuff)) { - nlj.join(joinBuff.getBuffer(), writer); - joinBuff.reset(); + try { + IFrame joinBuff = new VSizeFrame(ctx); + outerReader.open(); + try { + while (outerReader.nextFrame(joinBuff)) { + nlj.join(joinBuff.getBuffer(), writer); + joinBuff.reset(); + } + nlj.completeJoin(writer); + } finally { + nlj.releaseMemory(); + } + } finally { + outerReader.close(); } - - nlj.closeJoin(writer); - outerReader.close(); - innerReader.close(); } }; return op; } } - public void setSkipInMemHJ(boolean b) { - skipInMemoryHJ = b; - } - - public void setForceNLJ(boolean b) { - forceNLJ = b; - } - - public void setForceRR(boolean b) { - forceRoleReversal = !isLeftOuter && b; - } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java index 4253114..e7da174 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java @@ -82,14 +82,12 @@ public class MaterializerTaskState extends AbstractStateObject { while (in.nextFrame(frame)) { writer.nextFrame(frame.getBuffer()); } - } catch (Throwable th) { - throw new HyracksDataException(th); } finally { in.close(); } - } catch (Throwable th) { + } catch (Exception e) { writer.fail(); - throw new HyracksDataException(th); + throw e; } finally { writer.close(); if (numConsumers.decrementAndGet() == 0) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java index 90b4b6c..b422ef4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java @@ -80,7 +80,8 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc); return new AbstractUnaryInputSinkOperatorNodePushable() { - IFrameWriter datasetPartitionWriter; + private IFrameWriter datasetPartitionWriter; + private boolean failed = false; @Override public void open() throws HyracksDataException { @@ -110,15 +111,22 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat @Override public void fail() throws HyracksDataException { + failed = true; datasetPartitionWriter.fail(); } @Override public void close() throws HyracksDataException { - if (frameOutputStream.getTupleCount() > 0) { - frameOutputStream.flush(datasetPartitionWriter); + try { + if (!failed && frameOutputStream.getTupleCount() > 0) { + frameOutputStream.flush(datasetPartitionWriter); + } + } catch (Exception e) { + datasetPartitionWriter.fail(); + throw e; + } finally { + datasetPartitionWriter.close(); } - datasetPartitionWriter.close(); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java index 6d9d085..f4158ac 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java @@ -84,10 +84,13 @@ public abstract class AbstractExternalSortRunMerger { finalWriter = prepareSkipMergingFinalResultWriter(writer); finalWriter.open(); if (sorter != null) { - if (sorter.hasRemaining()) { - sorter.flush(finalWriter); + try { + if (sorter.hasRemaining()) { + sorter.flush(finalWriter); + } + } finally { + sorter.close(); } - sorter.close(); } } else { /** recycle sort buffer */ @@ -128,10 +131,15 @@ public abstract class AbstractExternalSortRunMerger { RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile(); IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter); - mergeResultWriter.open(); - merge(mergeResultWriter, partialRuns); - mergeResultWriter.close(); - + try { + mergeResultWriter.open(); + merge(mergeResultWriter, partialRuns); + } catch (Throwable t) { + mergeResultWriter.fail(); + throw t; + } finally { + mergeResultWriter.close(); + } reader = mergeFileWriter.createReader(); } runs.add(reader); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java index cdabcda..ef9e4b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java @@ -103,7 +103,7 @@ class TestSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescrip writer.nextFrame(frame); } catch (Exception e) { writer.fail(); - throw new HyracksDataException(e); + throw e; } finally { writer.close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java index bba18b3..683857f 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java @@ -84,9 +84,9 @@ class DummySourceOperatorDescriptor extends AbstractSingleActivityOperatorDescri public void initialize() throws HyracksDataException { try { writer.open(); - } catch (Throwable th) { + } catch (Exception e) { writer.fail(); - throw new HyracksDataException(th); + throw e; } finally { writer.close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java index f83ab6a..521dff1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java @@ -227,12 +227,14 @@ public abstract class AbstractExternalGroupbyTest { ResultValidateWriter writer = new ResultValidateWriter(keyValueMap); - getBuilder().open(); - for (IFrame frame : input) { - getBuilder().nextFrame(frame.getBuffer()); + try { + getBuilder().open(); + for (IFrame frame : input) { + getBuilder().nextFrame(frame.getBuffer()); + } + } finally { + getBuilder().close(); } - getBuilder().close(); - getMerger().setOutputFrameWriter(0, writer, outputRec); getMerger().initialize(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java index 673c6fa..cfd4f30 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java @@ -130,13 +130,16 @@ public abstract class AbstractRunGeneratorTest { assertTrue(runs.size() > 0); for (GeneratedRunFileReader run : runs) { - run.open(); - int preKey = Integer.MIN_VALUE; - while (run.nextFrame(frame)) { - fta.reset(frame.getBuffer()); - preKey = assertFTADataIsSorted(fta, keyValuePair, preKey); + try { + run.open(); + int preKey = Integer.MIN_VALUE; + while (run.nextFrame(frame)) { + fta.reset(frame.getBuffer()); + preKey = assertFTADataIsSorted(fta, keyValuePair, preKey); + } + } finally { + run.close(); } - run.close(); } assertTrue(keyValuePair.isEmpty()); }