asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/3] incubator-asterixdb-hyracks git commit: Changed the IFrameWriter Contract
Date Wed, 16 Dec 2015 06:35:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 0278f92..c0c467a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -43,7 +43,6 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -52,7 +51,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
-import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFamily;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
@@ -160,7 +158,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
             ITuplePairComparatorFactory tupPaircomparatorFactory0,
             ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
-            throws HyracksDataException {
+                    throws HyracksDataException {
 
         super(spec, 2, 1);
         this.memsize = memsize;
@@ -207,8 +205,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
         if (memorySize > buildSize) {
             return 1; //We will switch to in-Mem HJ eventually
         }
-        numberOfPartitions = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize)
-                / (double) (memorySize - 1)));
+        numberOfPartitions = (int) (Math.ceil((buildSize * factor / nPartitions - memorySize) / (memorySize - 1)));
         if (numberOfPartitions <= 0) {
             numberOfPartitions = 1; //becomes in-memory hash join
         }
@@ -273,12 +270,12 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
-                        .getJobId(), new TaskId(getActivityId(), partition));
+                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
+                        ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
 
                 ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                         hashFunctionGeneratorFactories).createPartitioner(0);
@@ -351,15 +348,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
 
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
             final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -378,12 +375,11 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-
                     writer.open();
-                    state.hybridHJ.initProbe();
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
+                    state.hybridHJ.initProbe();
                     LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
                 }
 
@@ -399,45 +395,40 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.hybridHJ.closeProbe(writer);
-
-                    BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
-
-                    rPartbuff.reset();
-                    for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
-
-                        RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
-                        RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
-
-                        if (bReader == null || pReader
-                                == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
-                            continue;
+                    try {
+                        state.hybridHJ.closeProbe(writer);
+                        BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
+                        rPartbuff.reset();
+                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
+                                .nextSetBit(pid + 1)) {
+                            RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
+                            RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
+                            if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+                                continue;
+                            }
+                            int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
+                            int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
+                            int beforeMax = (bSize > pSize) ? bSize : pSize;
+                            joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
                         }
-                        int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
-                        int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
-                        int beforeMax = (bSize > pSize) ? bSize : pSize;
-                        joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
+                    } finally {
+                        writer.close();
                     }
-                    writer.close();
                     LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
                 }
 
                 private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
                         RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed)
-                        throws HyracksDataException {
+                                throws HyracksDataException {
                     ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
 
-                    long buildPartSize = wasReversed ?
-                            (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize()) :
-                            (ohhj
-                                    .getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
-                    long probePartSize = wasReversed ?
-                            (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize()) :
-                            (ohhj
-                                    .getProbePartitionSize(pid) / ctx.getInitialFrameSize());
+                    long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize())
+                            : (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
+                    long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize())
+                            : (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize());
 
                     LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
                             + pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
@@ -448,12 +439,11 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
                             || (probePartSize < state.memForJoin && !isLeftOuter)) {
                         int tabSize = -1;
-                        if (!forceRR && (isLeftOuter || (buildPartSize
-                                < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
                             LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
                                     + level + "]");
-                            tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj
-                                    .getBuildPartitionSizeInTup(pid);
+                            tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid)
+                                    : ohhj.getBuildPartitionSizeInTup(pid);
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
@@ -465,8 +455,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             LOGGER.fine(
                                     "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
                                             + level + "]");
-                            tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj
-                                    .getProbePartitionSizeInTup(pid);
+                            tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid)
+                                    : ohhj.getProbePartitionSizeInTup(pid);
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
@@ -480,8 +470,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     else {
                         LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
                         OptimizedHybridHashJoin rHHj;
-                        if (!forceRR && (isLeftOuter
-                                || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
                             LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                     + level + "]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
@@ -513,13 +502,12 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                     : maxAfterProbeSize;
 
                             BitSet rPStatus = rHHj.getPartitionStatus();
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
-                                    * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                                 + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -527,16 +515,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1),
-                                            false); //checked-confirmed
+                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
                                 }
 
                             } else { //Case 2.1.2 - Switch to NLJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                                 + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -585,12 +572,11 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                     : maxAfterProbeSize;
                             BitSet rPStatus = rHHj.getPartitionStatus();
 
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
-                                    * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
                                 LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
                                         + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -598,15 +584,14 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1),
-                                            true); //checked-confirmed
+                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
                                 }
                             } else { //Case 2.2.2 - Switch to NLJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
                                                 + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -644,8 +629,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     bReader.open();
                     rPartbuff.reset();
                     while (bReader.nextFrame(rPartbuff)) {
-                        ByteBuffer copyBuffer = ctx
-                                .allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+                        ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
                         FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                         joiner.build(copyBuffer);
                         rPartbuff.reset();
@@ -665,10 +649,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                         RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
                         boolean reverse) throws HyracksDataException {
-                    NestedLoopJoin nlj = new NestedLoopJoin(ctx,
-                            new FrameTupleAccessor(outerRd),
-                            new FrameTupleAccessor(innerRd), nljComparator, memorySize,
-                            predEvaluator, isLeftOuter, nullWriters1);
+                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd),
+                            new FrameTupleAccessor(innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter,
+                            nullWriters1);
                     nlj.setIsReversed(reverse);
 
                     IFrame cacheBuff = new VSizeFrame(ctx);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 2398372..0c647d7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -49,9 +49,9 @@ public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutput
         writer.open();
         try {
             appender.flush(writer, false);
-        } catch (Exception e) {
+        } catch (Throwable th) {
             writer.fail();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(th);
         } finally {
             writer.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index b9c2fb1..f8a8a67 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -52,8 +52,8 @@ public class MaterializerTaskState extends AbstractStateObject {
     }
 
     public void open(IHyracksTaskContext ctx) throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                MaterializerTaskState.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                .createManagedWorkspaceFile(MaterializerTaskState.class.getSimpleName());
         out = new RunFileWriter(file, ctx.getIOManager());
         out.open();
     }
@@ -68,16 +68,19 @@ public class MaterializerTaskState extends AbstractStateObject {
 
     public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
         RunFileReader in = out.createDeleteOnCloseReader();
-        writer.open();
         try {
-            in.open();
-            while (in.nextFrame(frame)) {
-                writer.nextFrame(frame.getBuffer());
+            writer.open();
+            try {
+                in.open();
+                while (in.nextFrame(frame)) {
+                    writer.nextFrame(frame.getBuffer());
+                }
+            } finally {
+                in.close();
             }
-            in.close();
-        } catch (Exception e) {
+        } catch (Throwable th) {
             writer.fail();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(th);
         } finally {
             writer.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 04b893a..feff13c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -71,8 +71,8 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(new ActivityId(odId,
-                SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
+                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
         int taskOutputIndex = 0;
@@ -88,8 +88,8 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
             int activityId = MATERIALIZE_READER_ACTIVITY_ID;
             for (int i = 0; i < outputArity; i++) {
                 if (outputMaterializationFlags[i]) {
-                    MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(new ActivityId(odId,
-                            activityId));
+                    MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
+                            new ActivityId(odId, activityId));
                     builder.addActivity(this, mra);
                     builder.addTargetEdge(i, mra, 0);
                     builder.addBlockingEdge(sma, mra);
@@ -113,15 +113,17 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
             return new AbstractUnaryInputOperatorNodePushable() {
                 private MaterializerTaskState state;
                 private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+                private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
 
                 @Override
                 public void open() throws HyracksDataException {
                     if (requiresMaterialization) {
-                        state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(
-                                getActivityId(), partition));
+                        state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                                new TaskId(getActivityId(), partition));
                         state.open(ctx);
                     }
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        isOpen[i] = true;
                         writers[i].open();
                     }
                 }
@@ -138,19 +140,50 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
 
                 @Override
                 public void close() throws HyracksDataException {
-                    if (requiresMaterialization) {
-                        state.close();
-                        ctx.setStateObject(state);
+                    HyracksDataException hde = null;
+                    try {
+                        if (requiresMaterialization) {
+                            state.close();
+                            ctx.setStateObject(state);
+                        }
+                    } finally {
+                        for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                            if (isOpen[i]) {
+                                try {
+                                    writers[i].close();
+                                } catch (Throwable th) {
+                                    if (hde == null) {
+                                        hde = new HyracksDataException(th);
+                                    } else {
+                                        hde.addSuppressed(th);
+                                    }
+                                }
+                            }
+                        }
                     }
-                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        writers[i].close();
+                    if (hde != null) {
+                        throw hde;
                     }
                 }
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    HyracksDataException hde = null;
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        writers[i].fail();
+                        if (isOpen[i]) {
+                            try {
+                                writers[i].fail();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException(th);
+                                } else {
+                                    hde.addSuppressed(th);
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
                     }
                 }
 
@@ -172,21 +205,21 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                            getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
                     state.writeOut(writer, new VSizeFrame(ctx));
                 }
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
                     numberOfActiveMaterializeReaders--;
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                            getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
                     if (numberOfActiveMaterializeReaders == 0) {
                         state.deleteFile();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 8d54c39..74f1cb4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -87,9 +87,6 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
     private static class SortTaskState extends AbstractStateObject {
         private FrameSorterMergeSort frameSorter;
 
-        public SortTaskState() {
-        }
-
         private SortTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
         }
@@ -165,14 +162,14 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    writer.open();
                     try {
-                        SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                                getOperatorId(), SORT_ACTIVITY_ID), partition));
+                        writer.open();
+                        SortTaskState state = (SortTaskState) ctx.getStateObject(
+                                new TaskId(new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
                         state.frameSorter.flush(writer);
-                    } catch (Exception e) {
+                    } catch (Throwable th) {
                         writer.fail();
-                        throw new HyracksDataException(e);
+                        throw new HyracksDataException(th);
                     } finally {
                         writer.close();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index 3457fe8..289b879 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -34,7 +34,8 @@ import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
 
 public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
-    public UnionAllOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs, RecordDescriptor recordDescriptor) {
+    public UnionAllOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs,
+            RecordDescriptor recordDescriptor) {
         super(spec, nInputs, 1);
         recordDescriptors[0] = recordDescriptor;
     }
@@ -61,7 +62,7 @@ public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
         @Override
         public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             return new UnionOperator(ctx, inRecordDesc);
         }
@@ -69,9 +70,7 @@ public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
 
     private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
         private int nOpened;
-
         private int nClosed;
-
         private boolean failed;
 
         public UnionOperator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc) {
@@ -106,7 +105,7 @@ public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
                 @Override
                 public void fail() throws HyracksDataException {
                     synchronized (UnionOperator.this) {
-                        if (failed) {
+                        if (!failed) {
                             writer.fail();
                         }
                         failed = true;
@@ -117,6 +116,7 @@ public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
                 public void close() throws HyracksDataException {
                     synchronized (UnionOperator.this) {
                         if (++nClosed == inputArity) {
+                            // a single close
                             writer.close();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index b0b1d52..431a4f4 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -79,8 +79,8 @@ public class DataGenOperatorDescriptor extends AbstractSingleActivityOperatorDes
 
             @Override
             public void initialize() throws HyracksDataException {
-                writer.open();
                 try {
+                    writer.open();
                     for (int i = 0; i < numRecords; i++) {
                         tb.reset();
                         for (int j = 0; j < recDesc.getFieldCount(); j++) {
@@ -90,14 +90,15 @@ public class DataGenOperatorDescriptor extends AbstractSingleActivityOperatorDes
                         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                             appender.flush(writer, true);
                             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+                                throw new HyracksDataException("Record size (" + tb.getSize()
+                                        + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
                             }
                         }
                     }
                     appender.flush(writer, true);
-                } catch (Exception e) {
+                } catch (Throwable th) {
                     writer.fail();
-                    throw new HyracksDataException(e);
+                    throw new HyracksDataException(th);
                 } finally {
                     writer.close();
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index ed7c08d..5d300f4 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -84,9 +84,11 @@ class DummySourceOperatorDescriptor extends AbstractSingleActivityOperatorDescri
             public void initialize() throws HyracksDataException {
                 try {
                     writer.open();
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
+                } finally {
                     writer.close();
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 0d06655..26292f8 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -44,7 +43,7 @@ import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
  * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
  * key-value pairs into tuples.
  */
-@SuppressWarnings({ "deprecation", "rawtypes" })
+@SuppressWarnings({ "rawtypes" })
 public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
@@ -91,7 +90,7 @@ public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDe
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final InputSplit[] inputSplits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -102,46 +101,50 @@ public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDe
             public void initialize() throws HyracksDataException {
                 ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
+                    writer.open();
                     Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
                     JobConf conf = confFactory.getConf();
                     conf.setClassLoader(ctx.getJobletContext().getClassLoader());
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
-                    writer.open();
-                    parser.open(writer);
-                    InputFormat inputFormat = conf.getInputFormat();
-                    for (int i = 0; i < inputSplits.length; i++) {
-                        /**
-                         * read all the partitions scheduled to the current node
-                         */
-                        if (scheduledLocations[i].equals(nodeName)) {
+                    try {
+                        parser.open(writer);
+                        InputFormat inputFormat = conf.getInputFormat();
+                        for (int i = 0; i < inputSplits.length; i++) {
                             /**
-                             * pick an unread split to read
-                             * synchronize among simultaneous partitions in the same machine
+                             * read all the partitions scheduled to the current node
                              */
-                            synchronized (executed) {
-                                if (executed[i] == false) {
-                                    executed[i] = true;
-                                } else {
-                                    continue;
+                            if (scheduledLocations[i].equals(nodeName)) {
+                                /**
+                                 * pick an unread split to read
+                                 * synchronize among simultaneous partitions in the same machine
+                                 */
+                                synchronized (executed) {
+                                    if (executed[i] == false) {
+                                        executed[i] = true;
+                                    } else {
+                                        continue;
+                                    }
                                 }
-                            }
 
-                            /**
-                             * read the split
-                             */
-                            RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
-                            Object key = reader.createKey();
-                            Object value = reader.createValue();
-                            while (reader.next(key, value) == true) {
-                                parser.parse(key, value, writer, inputSplits[i].toString());
+                                /**
+                                 * read the split
+                                 */
+                                RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
+                                Object key = reader.createKey();
+                                Object value = reader.createValue();
+                                while (reader.next(key, value) == true) {
+                                    parser.parse(key, value, writer, inputSplits[i].toString());
+                                }
                             }
                         }
+                    } finally {
+                        parser.close(writer);
                     }
-                    parser.close(writer);
-                    writer.close();
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
                 } finally {
+                    writer.close();
                     Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 28c6cfe..d69191d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -104,7 +103,7 @@ public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDe
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final List<FileSplit> inputSplits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -116,11 +115,11 @@ public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDe
             public void initialize() throws HyracksDataException {
                 ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
+                    writer.open();
                     Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
                     Job job = confFactory.getConf();
                     job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
-                    writer.open();
                     InputFormat inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(),
                             job.getConfiguration());
                     int size = inputSplits.size();
@@ -155,10 +154,11 @@ public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDe
                         }
                     }
                     parser.close(writer);
-                    writer.close();
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
                 } finally {
+                    writer.close();
                     Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-storage-am-btree/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index cf14c3b..fba9d2c 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -16,57 +16,68 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-storage-am-btree</artifactId>
-  <name>hyracks-storage-am-btree</name>
-
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks</artifactId>
-    <version>0.2.17-SNAPSHOT</version>
-  </parent>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
-
-
-
-  <dependencies>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-storage-common</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>hyracks-storage-am-btree</artifactId>
+    <name>hyracks-storage-am-btree</name>
+    <parent>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks</artifactId>
+        <version>0.2.17-SNAPSHOT</version>
+    </parent>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+            <comments>A business-friendly OSS license</comments>
+        </license>
+    </licenses>
+    <dependencies>
         <dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-storage-am-common</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
-  </dependencies>
-</project>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-common</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-common</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-common</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-std</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>2.0.2-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
new file mode 100644
index 0000000..b5c14c1
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ BTreeUtils.class, FrameTupleAccessor.class, ArrayTupleBuilder.class,
+        IndexSearchOperatorNodePushable.class, FrameUtils.class, FrameTupleAppender.class })
+public class FramewriterTest {
+
+    private CountAnswer openException = new CountAndThrowException("Exception in open()");
+    private CountAnswer nextFrameException = new CountAndThrowException("Exception in nextFrame()");
+    private CountAnswer failException = new CountAndThrowException("Exception in fail()");
+    private CountAnswer closeException = new CountAndThrowException("Exception in close()");
+    private CountAnswer openError = new CountAndThrowError("Exception in open()");
+    private CountAnswer nextFrameError = new CountAndThrowError("Exception in nextFrame()");
+    private CountAnswer failError = new CountAndThrowError("Exception in fail()");
+    private CountAnswer closeError = new CountAndThrowError("Exception in close()");
+    private CountAnswer openNormal = new CountAnswer();
+    private CountAnswer nextFrameNormal = new CountAnswer();
+    private CountAnswer failNormal = new CountAnswer();
+    private CountAnswer closeNormal = new CountAnswer();
+
+    private int successes = 0;
+    private int failures = 0;
+    private static final int BUFFER_SIZE = 32000;
+    private static final int RECORDS_PER_FRAME = 3;
+    public static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(BUFFER_SIZE);
+    private static final int NUMBER_OF_APPENDERS = 2;
+    public int counter = 0;
+
+    public boolean validate(boolean finished) {
+        // get number of open calls
+        int openCount = openException.getCallCount() + openNormal.getCallCount() + openError.getCallCount();
+        int nextFrameCount = nextFrameException.getCallCount() + nextFrameNormal.getCallCount()
+                + nextFrameError.getCallCount();
+        int failCount = failException.getCallCount() + failNormal.getCallCount() + failError.getCallCount();
+        int closeCount = closeException.getCallCount() + closeNormal.getCallCount() + closeError.getCallCount();
+
+        if (failCount > 1 || closeCount > 1 || openCount > 1) {
+            failures++;
+            return false;
+        }
+        if (openCount == 0 && (nextFrameCount > 0 || failCount > 0 || closeCount > 0)) {
+            failures++;
+            return false;
+        }
+        if (finished) {
+            if (closeCount == 0 && (nextFrameCount > 0 || failCount > 0 || openCount > 0)) {
+                failures++;
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    public MultiComparator mockMultiComparator() {
+        MultiComparator mc = Mockito.mock(MultiComparator.class);
+        return mc;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // Mock static methods
+        PowerMockito.mockStatic(BTreeUtils.class);
+        PowerMockito.when(BTreeUtils.getSearchMultiComparator(Matchers.any(), Matchers.any()))
+                .thenReturn(mockMultiComparator());
+        PowerMockito.mockStatic(FrameUtils.class);
+
+        // Custom implementation for FrameUtils that push to next frame immediately
+        PowerMockito.when(
+                FrameUtils.appendToWriter(Matchers.any(IFrameWriter.class), Matchers.any(IFrameTupleAppender.class),
+                        Matchers.any(IFrameTupleAccessor.class), Matchers.anyInt(), Matchers.anyInt()))
+                .thenAnswer(new Answer<Integer>() {
+                    @Override
+                    public Integer answer(InvocationOnMock invocation) throws Throwable {
+                        Object[] args = invocation.getArguments();
+                        IFrameWriter writer = (IFrameWriter) args[0];
+                        writer.nextFrame(EMPTY_BUFFER);
+                        return BUFFER_SIZE;
+                    }
+                });
+
+        // create global mock for FrameTupleAccessor, ArrayTupleBuilder
+        FrameTupleAccessor frameAccessor = Mockito.mock(FrameTupleAccessor.class);
+        Mockito.when(frameAccessor.getTupleCount()).thenReturn(RECORDS_PER_FRAME);
+
+        // Global custom implementations for FrameTupleAppender
+        // since we have two appenders, then we need to test each test twice
+        FrameTupleAppender[] appenders = mockAppenders();
+
+        // Mock all instances of a class <Note that you need to prepare the class calling this constructor as well>
+        PowerMockito.whenNew(FrameTupleAccessor.class).withAnyArguments().thenReturn(frameAccessor);
+        PowerMockito.whenNew(FrameTupleAppender.class).withAnyArguments().thenAnswer(new Answer<FrameTupleAppender>() {
+            @Override
+            public FrameTupleAppender answer(InvocationOnMock invocation) throws Throwable {
+                counter++;
+                if (counter % 2 == 1) {
+                    return appenders[0];
+                }
+                return appenders[1];
+            }
+        });
+    }
+
+    public static FrameTupleAppender[] mockAppenders() throws HyracksDataException {
+        FrameTupleAppender[] appenders = new FrameTupleAppender[2];
+        appenders[0] = Mockito.mock(FrameTupleAppender.class);
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                Object[] args = invocation.getArguments();
+                IFrameWriter writer = (IFrameWriter) args[0];
+                writer.nextFrame(EMPTY_BUFFER);
+                return null;
+            }
+        }).when(appenders[0]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+
+        appenders[1] = Mockito.mock(FrameTupleAppender.class);
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                throw new HyracksDataException("couldn't flush frame");
+            }
+        }).when(appenders[1]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+
+        return appenders;
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    private void resetAllCounters() {
+        openException.reset();
+        nextFrameException.reset();
+        failException.reset();
+        closeException.reset();
+        openNormal.reset();
+        nextFrameNormal.reset();
+        failNormal.reset();
+        closeNormal.reset();
+        openError.reset();
+        nextFrameError.reset();
+        failError.reset();
+        closeError.reset();
+    }
+
+    @Test
+    public void test() {
+        try {
+            testBTreeSearchOperatorNodePushable();
+        } catch (Throwable th) {
+            th.printStackTrace();
+        }
+        System.out.println("Number of passed tests: " + successes);
+        System.out.println("Number of failed tests: " + failures);
+        Assert.assertEquals(failures, 0);
+    }
+
+    private void testBTreeSearchOperatorNodePushable() throws Exception {
+        /*
+         * coverage
+         * in open(){
+         *  writer.open() succeeds vs. throws exception vs. throws error
+         *   indexHelper.open() succeeds vs. throws exception
+         *    createAccessor() succeeds vs. throws exception
+         * }
+         * in nextFrame(){
+         *  indexAccessor.search succeeds vs. throws exception
+         *   writeSearchResults succeeds vs. throws exception vs. throws error
+         * }
+         * in fail(){
+         *  writer.fail() succeeds, throws exception, or throws error
+         * }
+         * in close(){
+         *  appender.close() succeeds, throws exception, or throws error
+         * }
+         */
+        int i = 0;
+        counter = 0;
+        while (i < NUMBER_OF_APPENDERS) {
+            i++;
+            ByteBuffer buffer = mockByteBuffer();
+            IFrameWriter[] outPutFrameWriters = createOutputWriters();
+            for (IFrameWriter outputWriter : outPutFrameWriters) {
+                IFrameWriter[] underTest = createWriters();
+                for (IFrameWriter writer : underTest) {
+                    ((AbstractUnaryOutputOperatorNodePushable) writer).setOutputFrameWriter(0, outputWriter,
+                            mockRecordDescriptor());
+                    testWriter(writer, buffer);
+                }
+            }
+            counter = i;
+        }
+    }
+
+    private ByteBuffer mockByteBuffer() {
+        return ByteBuffer.allocate(BUFFER_SIZE);
+    }
+
+    /**
+     * @return a list of writers to test. these writers can be of the same type but behave differently based on included mocks
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    public IFrameWriter[] createWriters() throws HyracksDataException, IndexException {
+        ArrayList<BTreeSearchOperatorNodePushable> writers = new ArrayList<BTreeSearchOperatorNodePushable>();
+        AbstractTreeIndexOperatorDescriptor[] opDescs = mockIndexOpDesc();
+        IRecordDescriptorProvider[] recordDescProviders = mockRecDescProviders();
+        int partition = 0;
+        IHyracksTaskContext[] ctxs = mockIHyracksTaskContext();
+        int[] keys = { 0 };
+        boolean lowKeyInclusive = true;
+        boolean highKeyInclusive = true;
+        for (AbstractTreeIndexOperatorDescriptor opDesc : opDescs) {
+            for (IRecordDescriptorProvider recordDescProvider : recordDescProviders) {
+                for (IHyracksTaskContext ctx : ctxs) {
+                    BTreeSearchOperatorNodePushable writer = new BTreeSearchOperatorNodePushable(opDesc, ctx, partition,
+                            recordDescProvider, keys, keys, lowKeyInclusive, highKeyInclusive, keys, keys);
+                    writers.add(writer);
+                }
+            }
+        }
+        // Create the framewriter using the mocks
+        return writers.toArray(new IFrameWriter[writers.size()]);
+    }
+
+    private IHyracksTaskContext[] mockIHyracksTaskContext() throws HyracksDataException {
+        IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
+        Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer());
+        Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer());
+        Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE);
+        Mockito.when(ctx.reallocateFrame(Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean()))
+                .thenReturn(mockByteBuffer());
+        return new IHyracksTaskContext[] { ctx };
+    }
+
+    private IRecordDescriptorProvider[] mockRecDescProviders() {
+        RecordDescriptor rDesc = mockRecordDescriptor();
+        IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+        Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+        Mockito.when(rDescProvider.getOutputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+        return new IRecordDescriptorProvider[] { rDescProvider };
+    }
+
+    @SuppressWarnings("rawtypes")
+    private RecordDescriptor mockRecordDescriptor() {
+        ISerializerDeserializer serde = Mockito.mock(ISerializerDeserializer.class);
+        RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[] { serde });
+        return rDesc;
+    }
+
+    public ITreeIndex[] mockIndexes() throws HyracksDataException, IndexException {
+        IIndexAccessor[] indexAccessors = mockIndexAccessors();
+        ITreeIndex[] indexes = new ITreeIndex[indexAccessors.length * 2];
+        int j = 0;
+        for (int i = 0; i < indexAccessors.length; i++) {
+            indexes[j] = Mockito.mock(ITreeIndex.class);
+            Mockito.when(indexes[j].createAccessor(Mockito.any(), Mockito.any())).thenReturn(indexAccessors[i]);
+            j++;
+            indexes[j] = Mockito.mock(ITreeIndex.class);
+            Mockito.when(indexes[j].createAccessor(Mockito.any(), Mockito.any()))
+                    .thenThrow(new HyracksDataException("failed to create accessor"));
+            j++;
+        }
+        return indexes;
+    }
+
+    private IIndexAccessor[] mockIndexAccessors() throws HyracksDataException, IndexException {
+        IIndexCursor[] cursors = mockIndexCursors();
+        IIndexAccessor[] accessors = new IIndexAccessor[cursors.length * 2];
+        int j = 0;
+        for (int i = 0; i < cursors.length; i++) {
+            IIndexCursor cursor = cursors[i];
+            IIndexAccessor accessor = Mockito.mock(IIndexAccessor.class);
+            Mockito.when(accessor.createSearchCursor(Matchers.anyBoolean())).thenReturn(cursor);
+            accessors[j] = accessor;
+            j++;
+            accessor = Mockito.mock(IIndexAccessor.class);
+            Mockito.when(accessor.createSearchCursor(Matchers.anyBoolean())).thenReturn(cursor);
+            Mockito.doAnswer(new Answer<Object>() {
+                private int k = 0;
+
+                @Override
+                public Object answer(InvocationOnMock invocation) throws Throwable {
+                    k++;
+                    if (k % 2 == 0) {
+                        throw new HyracksDataException("Couldn't search index");
+                    }
+                    return null;
+                }
+            }).when(accessor).search(Matchers.any(), Matchers.any());
+            accessors[j] = accessor;
+            j++;
+        }
+
+        return accessors;
+    }
+
+    private IIndexCursor[] mockIndexCursors() throws HyracksDataException, IndexException {
+        ITupleReference[] tuples = mockTuples();
+        IIndexCursor[] cursors = new IIndexCursor[tuples.length * 2];
+        int j = 0;
+        for (int i = 0; i < tuples.length; i++) {
+            IIndexCursor cursor = Mockito.mock(IIndexCursor.class);
+            Mockito.when(cursor.hasNext()).thenReturn(true, true, false);
+            Mockito.when(cursor.getTuple()).thenReturn(tuples[i]);
+            cursors[j] = cursor;
+            j++;
+            cursor = Mockito.mock(IIndexCursor.class);
+            Mockito.when(cursor.hasNext()).thenReturn(true, true, false);
+            Mockito.when(cursor.getTuple()).thenReturn(tuples[i]);
+            Mockito.doThrow(new HyracksDataException("Failed to close cursor")).when(cursor).close();
+            cursors[j] = cursor;
+            j++;
+        }
+        return cursors;
+    }
+
+    private ITupleReference[] mockTuples() {
+        ITupleReference tuple = Mockito.mock(ITupleReference.class);
+        return new ITupleReference[] { tuple };
+    }
+
+    public IIndexDataflowHelper[] mockIndexHelpers() throws HyracksDataException, IndexException {
+        ITreeIndex[] indexes = mockIndexes();
+        IIndexDataflowHelper[] indexHelpers = new IIndexDataflowHelper[indexes.length * 2];
+        int j = 0;
+        for (int i = 0; i < indexes.length; i++) {
+            // normal
+            indexHelpers[j] = Mockito.mock(IIndexDataflowHelper.class);
+            Mockito.when(indexHelpers[j].getIndexInstance()).thenReturn(indexes[i]);
+
+            // throws exception when opened
+            j++;
+            indexHelpers[j] = Mockito.mock(IIndexDataflowHelper.class);
+            Mockito.doThrow(new HyracksDataException("couldn't open index")).when(indexHelpers[j]).open();
+            Mockito.when(indexHelpers[j].getIndexInstance()).thenReturn(null);
+
+            j++;
+        }
+        return indexHelpers;
+    }
+
+    public IIndexDataflowHelperFactory[] mockIndexHelperFactories() throws HyracksDataException, IndexException {
+        IIndexDataflowHelper[] helpers = mockIndexHelpers();
+        IIndexDataflowHelperFactory[] indexHelperFactories = new IIndexDataflowHelperFactory[helpers.length];
+        for (int i = 0; i < helpers.length; i++) {
+            indexHelperFactories[i] = Mockito.mock(IIndexDataflowHelperFactory.class);
+            Mockito.when(
+                    indexHelperFactories[i].createIndexDataflowHelper(Mockito.any(), Mockito.any(), Mockito.anyInt()))
+                    .thenReturn(helpers[i]);
+        }
+        return indexHelperFactories;
+    }
+
+    public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws HyracksDataException, IndexException {
+        IIndexDataflowHelperFactory[] indexDataflowHelperFactories = mockIndexHelperFactories();
+        ISearchOperationCallbackFactory[] searchOpCallbackFactories = mockSearchOpCallbackFactories();
+        AbstractTreeIndexOperatorDescriptor[] opDescs = new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
+                * searchOpCallbackFactories.length];
+        int k = 0;
+        for (int i = 0; i < indexDataflowHelperFactories.length; i++) {
+            for (int j = 0; j < searchOpCallbackFactories.length; j++) {
+                AbstractTreeIndexOperatorDescriptor opDesc = Mockito.mock(AbstractTreeIndexOperatorDescriptor.class);
+                Mockito.when(opDesc.getIndexDataflowHelperFactory()).thenReturn(indexDataflowHelperFactories[i]);
+                Mockito.when(opDesc.getRetainInput()).thenReturn(false);
+                Mockito.when(opDesc.getRetainNull()).thenReturn(false);
+                Mockito.when(opDesc.getSearchOpCallbackFactory()).thenReturn(searchOpCallbackFactories[j]);
+                opDescs[k] = opDesc;
+                k++;
+            }
+        }
+        return opDescs;
+    }
+
+    private ISearchOperationCallbackFactory[] mockSearchOpCallbackFactories() throws HyracksDataException {
+        ISearchOperationCallback searchOpCallback = mockSearchOpCallback();
+        ISearchOperationCallbackFactory searchOpCallbackFactory = Mockito.mock(ISearchOperationCallbackFactory.class);
+        Mockito.when(searchOpCallbackFactory.createSearchOperationCallback(Mockito.anyLong(), Mockito.any()))
+                .thenReturn(searchOpCallback);
+        return new ISearchOperationCallbackFactory[] { searchOpCallbackFactory };
+    }
+
+    private ISearchOperationCallback mockSearchOpCallback() {
+        ISearchOperationCallback opCallback = Mockito.mock(ISearchOperationCallback.class);
+        return opCallback;
+    }
+
+    public class CountAnswer implements Answer<Object> {
+        protected int count = 0;
+
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            count++;
+            return null;
+        }
+
+        public int getCallCount() {
+            return count;
+        }
+
+        public void reset() {
+            count = 0;
+        }
+    }
+
+    public class CountAndThrowException extends CountAnswer {
+        private String errorMessage;
+
+        public CountAndThrowException(String errorMessage) {
+            this.errorMessage = errorMessage;
+        }
+
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            count++;
+            throw new HyracksDataException(errorMessage);
+        }
+    }
+
+    public class CountAndThrowError extends CountAnswer {
+        private String errorMessage;
+
+        public CountAndThrowError(String errorMessage) {
+            this.errorMessage = errorMessage;
+        }
+
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            count++;
+            throw new UnknownError(errorMessage);
+        }
+    }
+
+    public IFrameWriter[] createOutputWriters() throws Exception {
+        CountAnswer[] opens = new CountAnswer[] { openNormal, openException, openError };
+        CountAnswer[] nextFrames = new CountAnswer[] { nextFrameNormal, nextFrameException, nextFrameError };
+        CountAnswer[] fails = new CountAnswer[] { failNormal, failException, failError };
+        CountAnswer[] closes = new CountAnswer[] { closeNormal, closeException, closeError };
+        List<IFrameWriter> outputWriters = new ArrayList<IFrameWriter>();
+        for (CountAnswer openAnswer : opens) {
+            for (CountAnswer nextFrameAnswer : nextFrames) {
+                for (CountAnswer failAnswer : fails) {
+                    for (CountAnswer closeAnswer : closes) {
+                        IFrameWriter writer = Mockito.mock(IFrameWriter.class);
+                        Mockito.doAnswer(openAnswer).when(writer).open();
+                        Mockito.doAnswer(nextFrameAnswer).when(writer).nextFrame(Mockito.any());
+                        Mockito.doAnswer(failAnswer).when(writer).fail();
+                        Mockito.doAnswer(closeAnswer).when(writer).close();
+                        outputWriters.add(writer);
+                    }
+                }
+            }
+        }
+        return outputWriters.toArray(new IFrameWriter[outputWriters.size()]);
+    }
+
+    public void testWriter(IFrameWriter writer, ByteBuffer buffer) throws Exception {
+        resetAllCounters();
+        boolean failed = !validate(false);
+        if (failed) {
+            return;
+        }
+        try {
+            writer.open();
+            failed = !validate(false);
+            if (failed) {
+                return;
+            }
+            for (int i = 0; i < 10; i++) {
+                writer.nextFrame(buffer);
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+            }
+        } catch (Throwable th1) {
+            try {
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+                writer.fail();
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+            } catch (Throwable th2) {
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+            }
+        } finally {
+            if (!failed) {
+                try {
+                    failed = !validate(false);
+                    if (failed) {
+                        return;
+                    }
+                    writer.close();
+                    failed = !validate(true);
+                    if (failed) {
+                        return;
+                    }
+                } catch (Throwable th3) {
+                    failed = !validate(true);
+                    if (failed) {
+                        return;
+                    }
+                }
+            }
+        }
+        successes++;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
index 81d7834..9ee7969 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -25,8 +25,14 @@ import org.apache.hyracks.api.io.FileReference;
 public interface IIndexDataflowHelper {
     public void create() throws HyracksDataException;
 
+    /*
+     * If close throws an exception, it means that the index was not closed successfully.
+     */
     public void close() throws HyracksDataException;
 
+    /*
+     * If open throws an exception, it means that the index was not opened successfully.
+     */
     public void open() throws HyracksDataException;
 
     public void destroy() throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 82f9fd6..631b2f1 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -33,8 +33,7 @@ import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
-public class IndexBulkLoadOperatorNodePushable extends
-        AbstractUnaryInputUnaryOutputOperatorNodePushable {
+public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
     protected final float fillFactor;
@@ -48,15 +47,12 @@ public class IndexBulkLoadOperatorNodePushable extends
     protected IRecordDescriptorProvider recDescProvider;
     protected PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
 
-    public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc,
-            IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
-            float fillFactor, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex,
-            IRecordDescriptorProvider recordDescProvider) {
+    public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
         this.ctx = ctx;
-        this.indexHelper = opDesc.getIndexDataflowHelperFactory()
-                .createIndexDataflowHelper(opDesc, ctx, partition);
+        this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
         this.numElementsHint = numElementsHint;
@@ -68,19 +64,16 @@ public class IndexBulkLoadOperatorNodePushable extends
 
     @Override
     public void open() throws HyracksDataException {
-        RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(
-                opDesc.getActivityId(), 0);
+        RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(recDesc);
         indexHelper.open();
         index = indexHelper.getIndexInstance();
         try {
-            bulkLoader = index.createBulkLoader(fillFactor, verifyInput,
-                    numElementsHint, checkIfEmptyIndex);
+            writer.open();
+            bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
         } catch (Exception e) {
-            indexHelper.close();
             throw new HyracksDataException(e);
         }
-        writer.open();
     }
 
     @Override
@@ -105,15 +98,24 @@ public class IndexBulkLoadOperatorNodePushable extends
     public void close() throws HyracksDataException {
         try {
             bulkLoader.end();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         } finally {
-            indexHelper.close();
+            if (index != null) {
+                // If index was opened!
+                try {
+                    indexHelper.close();
+                } finally {
+                    writer.close();
+                }
+            }
         }
-        writer.close();
     }
 
     @Override
     public void fail() throws HyracksDataException {
+        if (index != null) {
+            writer.fail();
+        }
     }
 }
\ No newline at end of file



Mime
View raw message