asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/3] incubator-asterixdb-hyracks git commit: Changed the IFrameWriter Contract
Date Wed, 16 Dec 2015 06:35:24 GMT
Changed the IFrameWriter Contract

Updated existing operators and added a test case for BTreeSearchOperatorNodePushable.
With this change, calling the open method itself moves it to the open state and
hence, close must be called.

Change-Id: I03da090002f79f4db7b5b31454ce3ac2b9e40c7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/551
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/0ad2cceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/0ad2cceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/0ad2cceb

Branch: refs/heads/master
Commit: 0ad2cceb0826800f6dd94b6389da683eaa1abe55
Parents: 3a88250
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Wed Dec 16 09:04:10 2015 +0300
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Tue Dec 15 22:25:48 2015 -0800

----------------------------------------------------------------------
 .../aggreg/AggregateRuntimeFactory.java         |  18 +-
 ...actOneInputOneOutputOneFramePushRuntime.java |   7 +-
 .../operators/meta/SubplanRuntimeFactory.java   |   4 +-
 .../sort/InMemorySortRuntimeFactory.java        |  16 +-
 .../operators/std/AssignRuntimeFactory.java     |  13 +-
 .../PartitioningSplitOperatorDescriptor.java    |  55 +-
 .../std/RunningAggregateRuntimeFactory.java     |  16 +
 .../std/StreamLimitRuntimeFactory.java          |   8 +-
 .../std/StreamProjectRuntimeFactory.java        |   2 +-
 .../std/StreamSelectRuntimeFactory.java         |  23 +-
 .../operators/std/UnnestRuntimeFactory.java     |   7 +-
 .../apache/hyracks/api/comm/IFrameWriter.java   |  21 +-
 .../ConnectorSenderProfilingFrameWriter.java    |  23 +-
 .../mapreduce/MapperOperatorDescriptor.java     |  21 +-
 .../LocalityAwarePartitionDataWriter.java       |  47 +-
 .../MToNReplicatingConnectorDescriptor.java     |  40 +-
 .../std/connectors/PartitionDataWriter.java     |  47 +-
 .../std/file/FileScanOperatorDescriptor.java    |   6 +-
 .../ExternalGroupMergeOperatorNodePushable.java |   2 +-
 .../preclustered/PreclusteredGroupWriter.java   |  29 +-
 .../join/GraceHashJoinOperatorNodePushable.java |   9 +-
 .../join/HybridHashJoinOperatorDescriptor.java  | 138 ++---
 .../InMemoryHashJoinOperatorDescriptor.java     |  30 +-
 .../join/NestedLoopJoinOperatorDescriptor.java  |  25 +-
 ...timizedHybridHashJoinOperatorDescriptor.java | 129 ++--
 ...ConstantTupleSourceOperatorNodePushable.java |   4 +-
 .../std/misc/MaterializerTaskState.java         |  21 +-
 .../std/misc/SplitOperatorDescriptor.java       |  67 ++-
 .../sort/InMemorySortOperatorDescriptor.java    |  13 +-
 .../std/union/UnionAllOperatorDescriptor.java   |  10 +-
 .../btree/helper/DataGenOperatorDescriptor.java |   9 +-
 .../rewriting/SuperActivityRewritingTest.java   |   6 +-
 .../dataflow/HDFSReadOperatorDescriptor.java    |  65 ++-
 .../dataflow/HDFSReadOperatorDescriptor.java    |  12 +-
 hyracks/hyracks-storage-am-btree/pom.xml        | 115 ++--
 .../storage/am/btree/test/FramewriterTest.java  | 582 +++++++++++++++++++
 .../am/common/api/IIndexDataflowHelper.java     |   6 +
 .../IndexBulkLoadOperatorNodePushable.java      |  40 +-
 ...xInsertUpdateDeleteOperatorNodePushable.java |  28 +-
 .../IndexSearchOperatorNodePushable.java        |  53 +-
 ...eIndexDiskOrderScanOperatorNodePushable.java |  27 +-
 .../BinaryTokenizerOperatorNodePushable.java    |  16 +-
 42 files changed, 1337 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 8759f1b..bafe8a7 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -69,6 +69,7 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
             private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
 
             private boolean first = true;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -86,7 +87,7 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
                 }
-
+                isOpen = true;
                 writer.open();
             }
 
@@ -103,9 +104,14 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
 
             @Override
             public void close() throws HyracksDataException {
-                computeAggregate();
-                appendToFrameFromTupleBuilder(tupleBuilder);
-                super.close();
+                if (isOpen) {
+                    try {
+                        computeAggregate();
+                        appendToFrameFromTupleBuilder(tupleBuilder);
+                    } finally {
+                        super.close();
+                    }
+                }
             }
 
             private void computeAggregate() throws HyracksDataException {
@@ -132,7 +138,9 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
 
             @Override
             public void fail() throws HyracksDataException {
-                writer.fail();
+                if (isOpen) {
+                    writer.fail();
+                }
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 7fd1d05..e57a4ba 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,11 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
 
     @Override
     public void close() throws HyracksDataException {
-        flushIfNotFailed();
-        writer.close();
+        try {
+            flushIfNotFailed();
+        } finally {
+            writer.close();
+        }
     }
 
     protected void flushAndReset() throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index f10151e..25eb229 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -116,6 +116,7 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
                         appendNullsToTuple();
                         appendToFrameFromTupleBuilder(tb);
                     }
+
                 }
 
                 @Override
@@ -146,11 +147,11 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 if (first) {
                     first = false;
                     initAccessAppendRef(ctx);
                 }
-                writer.open();
             }
 
             @Override
@@ -164,6 +165,7 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
                     startOfPipeline.close();
                 }
             }
+
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 291b92d..bca301f 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -31,8 +31,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.sort.FrameSorterMergeSort;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
 
 public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -64,15 +64,14 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 if (frameSorter == null) {
                     IFrameBufferManager manager = new VariableFrameMemoryManager(
-                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
-                            new FrameFreeSlotLastFit());
+                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), new FrameFreeSlotLastFit());
                     frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
                             comparatorFactories, outputRecordDesc);
                 }
                 frameSorter.reset();
-                writer.open();
             }
 
             @Override
@@ -87,9 +86,12 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
 
             @Override
             public void close() throws HyracksDataException {
-                frameSorter.sort();
-                frameSorter.flush(writer);
-                writer.close();
+                try {
+                    frameSorter.sort();
+                    frameSorter.flush(writer);
+                } finally {
+                    writer.close();
+                }
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 9c8a35c..79d9b7d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -96,6 +96,7 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
             private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
             private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -111,10 +112,18 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                         }
                     }
                 }
+                isOpen = true;
                 writer.open();
             }
 
             @Override
+            public void close() throws HyracksDataException {
+                if (isOpen) {
+                    super.close();
+                }
+            }
+
+            @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();
@@ -158,7 +167,9 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
 
             @Override
             public void fail() throws HyracksDataException {
-                writer.fail();
+                if (isOpen) {
+                    writer.fail();
+                }
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index 83d0c61..3d1eb06 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -66,9 +66,10 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new AbstractUnaryInputOperatorNodePushable() {
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+            private final boolean[] isOpen = new boolean[outputArity];
             private final IFrame[] writeBuffers = new IFrame[outputArity];
             private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
             private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
@@ -83,19 +84,52 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
 
             @Override
             public void close() throws HyracksDataException {
-                // Flush (possibly not full) buffers that have data, and close writers.
+                HyracksDataException hde = null;
                 for (int i = 0; i < outputArity; i++) {
-                    tupleAppender.reset(writeBuffers[i], false);
-                    // ? by JF why didn't clear the buffer ?
-                    tupleAppender.flush(writers[i], false);
-                    writers[i].close();
+                    if (isOpen[i]) {
+                        try {
+                            tupleAppender.reset(writeBuffers[i], false);
+                            // ? by JF why didn't clear the buffer ?
+                            tupleAppender.flush(writers[i], false);
+                        } catch (Throwable th) {
+                            if (hde == null) {
+                                hde = new HyracksDataException();
+                            }
+                            hde.addSuppressed(th);
+                        } finally {
+                            try {
+                                writers[i].close();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException();
+                                }
+                                hde.addSuppressed(th);
+                            }
+                        }
+                    }
+                }
+                if (hde != null) {
+                    throw hde;
                 }
             }
 
             @Override
             public void fail() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.fail();
+                HyracksDataException hde = null;
+                for (int i = 0; i < outputArity; i++) {
+                    if (isOpen[i]) {
+                        try {
+                            writers[i].fail();
+                        } catch (Throwable th) {
+                            if (hde == null) {
+                                hde = new HyracksDataException();
+                            }
+                            hde.addSuppressed(th);
+                        }
+                    }
+                }
+                if (hde != null) {
+                    throw hde;
                 }
             }
 
@@ -144,8 +178,9 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
 
             @Override
             public void open() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.open();
+                for (int i = 0; i < writers.length; i++) {
+                    isOpen[i] = true;
+                    writers[i].open();
                 }
                 // Create write buffers.
                 for (int i = 0; i < outputArity; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 34bd1e0..8a5f38c 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -90,6 +90,7 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
             private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
             private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -112,10 +113,25 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
                         throw new HyracksDataException(ae);
                     }
                 }
+                isOpen = true;
                 writer.open();
             }
 
             @Override
+            public void close() throws HyracksDataException {
+                if (isOpen) {
+                    super.close();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                if (isOpen) {
+                    super.fail();
+                }
+            }
+
+            @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 5605485..ef172c7 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -73,7 +73,7 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
 
             @Override
             public void open() throws HyracksDataException {
-                // if (first) {
+                writer.open();
                 if (evalMaxObjects == null) {
                     initAccessAppendRef(ctx);
                     try {
@@ -85,14 +85,12 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
                         throw new HyracksDataException(ae);
                     }
                 }
-                writer.open();
                 afterLastTuple = false;
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 if (afterLastTuple) {
-                    // ignore the data
                     return;
                 }
                 tAccess.reset(buffer);
@@ -123,7 +121,6 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
                             appendTupleToFrame(t);
                         }
                     } else {
-                        // close();
                         afterLastTuple = true;
                         break;
                     }
@@ -136,9 +133,7 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
                 toSkip = 0; // how many tuples still to skip
                 firstTuple = true;
                 afterLastTuple = false;
-                // if (!afterLastTuple) {
                 super.close();
-                // }
             }
 
             private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
@@ -154,5 +149,4 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
 
         };
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 34754b4..2cea90d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -56,11 +56,11 @@ public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntim
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 if (first) {
                     first = false;
                     initAccessAppend(ctx);
                 }
-                writer.open();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 416c398..75c3d08 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -53,7 +53,8 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
 
     /**
      * @param cond
-     * @param projectionList               if projectionList is null, then no projection is performed
+     * @param projectionList
+     *            if projectionList is null, then no projection is performed
      * @param retainNull
      * @param nullPlaceholderVariableIndex
      * @param nullWriterFactory
@@ -83,6 +84,7 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
             private IScalarEvaluator eval;
             private INullWriter nullWriter = null;
             private ArrayTupleBuilder nullTupleBuilder = null;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -94,6 +96,7 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
                         throw new HyracksDataException(ae);
                     }
                 }
+                isOpen = true;
                 writer.open();
 
                 //prepare nullTupleBuilder
@@ -107,6 +110,24 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
             }
 
             @Override
+            public void fail() throws HyracksDataException {
+                if (isOpen) {
+                    super.fail();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (isOpen) {
+                    try {
+                        flushIfNotFailed();
+                    } finally {
+                        writer.close();
+                    }
+                }
+            }
+
+            @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 8850436..6b21cda 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -36,7 +36,6 @@ import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -96,6 +95,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 initAccessAppendRef(ctx);
                 try {
                     agg = unnestingFactory.createUnnestingEvaluator(ctx);
@@ -103,7 +103,6 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
-                writer.open();
             }
 
             @Override
@@ -112,16 +111,12 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                 int nTuple = tAccess.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
                     tRef.reset(tAccess, t);
-
                     try {
                         offsetEval.evaluate(tRef, p);
                     } catch (AlgebricksException e) {
                         throw new HyracksDataException(e);
                     }
-
-                    @SuppressWarnings("static-access")
                     int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset());
-
                     try {
                         agg.init(tRef);
                         // assume that when unnesting the tuple, each step() call for each element

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index 260caa1..f6c3ad0 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -33,22 +33,18 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
  * A producer follows the following protocol when using an {@link IFrameWriter}.
  * Initially, the {@link IFrameWriter} is in the INITIAL state.
  * The first valid call to an {@link IFrameWriter} is always the {@link IFrameWriter#open()}. This call provides the opportunity for the {@link IFrameWriter} implementation to allocate any resources for its
- * processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
- * state. If an error occurs
- * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException} is thrown and it stays in the INITIAL state.
- * While the {@link IFrameWriter} is in the OPENED state, the producer can call
- * one of:
+ * processing. Once open() is called, no matter successfully or not, the {@link IFrameWriter} is in the OPENED
+ * state.
+ * While the {@link IFrameWriter} is in the OPENED state, the producer can call one of:
  * <ul>
- * <li> {@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
- * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR state.</li>
- * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
+ * <li>{@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li>{@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, the {@link IFrameWriter} remains in the OPENED state.</li>
+ * <li>{@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
  * </ul>
  * In the FAILED state, the only call allowed is the {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
  * state and give up all resources.
  * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
- * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} is not called by the producer. So an exceptional
- * return from the {@link IFrameWriter#open()} call must clean up all partially
- * allocated resources.
+ * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} must still be called by the producer.
  *
  * @author vinayakb
  */
@@ -61,7 +57,8 @@ public interface IFrameWriter {
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
      *
-     * @param buffer - Buffer containing data.
+     * @param buffer
+     *            - Buffer containing data.
      * @throws HyracksDataException
      */
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
index 09dd03d..a46fa7b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -32,16 +32,16 @@ public class ConnectorSenderProfilingFrameWriter implements IFrameWriter {
     private final ICounter closeCounter;
     private final ICounter frameCounter;
 
-    public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer,
-            ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+    public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer, ConnectorDescriptorId cdId,
+            int senderIndex, int receiverIndex) {
         this.writer = writer;
         int attempt = ctx.getTaskAttemptId().getAttempt();
-        this.openCounter = ctx.getCounterContext().getCounter(
-                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
-        this.closeCounter = ctx.getCounterContext().getCounter(
-                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
-        this.frameCounter = ctx.getCounterContext().getCounter(
-                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
+        this.openCounter = ctx.getCounterContext()
+                .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
+        this.closeCounter = ctx.getCounterContext()
+                .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
+        this.frameCounter = ctx.getCounterContext()
+                .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
     }
 
     @Override
@@ -58,8 +58,11 @@ public class ConnectorSenderProfilingFrameWriter implements IFrameWriter {
 
     @Override
     public void close() throws HyracksDataException {
-        closeCounter.update(1);
-        writer.close();
+        try {
+            closeCounter.update(1);
+        } finally {
+            writer.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index 391a636..1ebebda 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -70,10 +69,11 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
         recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final HadoopHelper helper = new HadoopHelper(config);
         final Configuration conf = helper.getConfiguration();
         final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
@@ -219,18 +219,19 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                 for (int i = 0; i < comparatorFactories.length; ++i) {
                     comparators[i] = comparatorFactories[i].createBinaryComparator();
                 }
-                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(),
-                        runGen.getRuns(), new int[] { 0 }, comparators, null,
-                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
+                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(), runGen.getRuns(),
+                        new int[] { 0 }, comparators, null, helper.getMapOutputRecordDescriptorWithoutExtraFields(),
+                        framesLimit, delegatingWriter);
                 merger.process();
             }
         }
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @SuppressWarnings("unchecked")
             @Override
             public void initialize() throws HyracksDataException {
-                writer.open();
                 try {
+                    writer.open();
                     SortingRecordWriter recordWriter = new SortingRecordWriter();
                     InputSplit split = null;
                     int blockId = 0;
@@ -246,9 +247,8 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                                 Thread.currentThread().setContextClassLoader(ctxCL);
                             }
                             recordWriter.initBlock(blockId);
-                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil()
-                                    .createMapContext(conf, taId, recordReader,
-                                            recordWriter, null, null, split);
+                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId,
+                                    recordReader, recordWriter, null, null, split);
                             mapper.run(mCtx);
                             recordReader.close();
                             recordWriter.sortAndFlushBlock(writer);
@@ -259,6 +259,9 @@ public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable,
                             throw new HyracksDataException(e);
                         }
                     }
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw th;
                 } finally {
                     writer.close();
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index 3e437e0..ee8c656 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 public class LocalityAwarePartitionDataWriter implements IFrameWriter {
 
     private final IFrameWriter[] pWriters;
+    private final boolean[] isWriterOpen;
     private final IFrameTupleAppender[] appenders;
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
@@ -46,6 +47,7 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
         int[] consumerPartitions = localityMap.getConsumers(senderIndex, nConsumerPartitions);
         pWriters = new IFrameWriter[consumerPartitions.length];
         appenders = new IFrameTupleAppender[consumerPartitions.length];
+        isWriterOpen = new boolean[consumerPartitions.length];
         for (int i = 0; i < consumerPartitions.length; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(consumerPartitions[i]);
@@ -67,6 +69,7 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
     @Override
     public void open() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
+            isWriterOpen[i] = true;
             pWriters[i].open();
         }
     }
@@ -94,8 +97,22 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
      */
     @Override
     public void fail() throws HyracksDataException {
+        HyracksDataException failException = null;
         for (int i = 0; i < appenders.length; ++i) {
-            pWriters[i].fail();
+            if (isWriterOpen[i]) {
+                try {
+                    pWriters[i].fail();
+                } catch (Throwable th) {
+                    if (failException == null) {
+                        failException = new HyracksDataException(th);
+                    } else {
+                        failException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (failException != null) {
+            throw failException;
         }
     }
 
@@ -106,10 +123,32 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
      */
     @Override
     public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
         for (int i = 0; i < pWriters.length; ++i) {
-            appenders[i].flush(pWriters[i], true);
-            pWriters[i].close();
+            if (isWriterOpen[i]) {
+                try {
+                    appenders[i].flush(pWriters[i], true);
+                } catch (Throwable th) {
+                    if (closeException == null) {
+                        closeException = new HyracksDataException(th);
+                    } else {
+                        closeException.addSuppressed(th);
+                    }
+                } finally {
+                    try {
+                        pWriters[i].close();
+                    } catch (Throwable th) {
+                        if (closeException == null) {
+                            closeException = new HyracksDataException(th);
+                        } else {
+                            closeException.addSuppressed(th);
+                        }
+                    }
+                }
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 1730b22..7a3a019 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -43,8 +43,9 @@ public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDes
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+        final boolean[] isOpen = new boolean[nConsumerPartitions];
         for (int i = 0; i < nConsumerPartitions; ++i) {
             epWriters[i] = edwFactory.createFrameWriter(i);
         }
@@ -62,21 +63,50 @@ public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDes
 
             @Override
             public void fail() throws HyracksDataException {
+                HyracksDataException failException = null;
                 for (int i = 0; i < epWriters.length; ++i) {
-                    epWriters[i].fail();
+                    if (isOpen[i]) {
+                        try {
+                            epWriters[i].fail();
+                        } catch (Throwable th) {
+                            if (failException == null) {
+                                failException = new HyracksDataException(th);
+                            } else {
+                                failException.addSuppressed(th);
+                            }
+                        }
+                    }
+                }
+                if (failException != null) {
+                    throw failException;
                 }
             }
 
             @Override
             public void close() throws HyracksDataException {
+                HyracksDataException closeException = null;
                 for (int i = 0; i < epWriters.length; ++i) {
-                    epWriters[i].close();
+                    if (isOpen[i]) {
+                        try {
+                            epWriters[i].close();
+                        } catch (Throwable th) {
+                            if (closeException == null) {
+                                closeException = new HyracksDataException(th);
+                            } else {
+                                closeException.addSuppressed(th);
+                            }
+                        }
+                    }
+                }
+                if (closeException != null) {
+                    throw closeException;
                 }
             }
 
             @Override
             public void open() throws HyracksDataException {
                 for (int i = 0; i < epWriters.length; ++i) {
+                    isOpen[i] = true;
                     epWriters[i].open();
                 }
             }
@@ -84,8 +114,8 @@ public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDes
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 08df2c5..336272c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -35,6 +35,7 @@ import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 public class PartitionDataWriter implements IFrameWriter {
     private final int consumerPartitionCount;
     private final IFrameWriter[] pWriters;
+    private final boolean[] isOpen;
     private final FrameTupleAppender[] appenders;
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
@@ -45,6 +46,7 @@ public class PartitionDataWriter implements IFrameWriter {
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
         this.consumerPartitionCount = consumerPartitionCount;
         pWriters = new IFrameWriter[consumerPartitionCount];
+        isOpen = new boolean[consumerPartitionCount];
         appenders = new FrameTupleAppender[consumerPartitionCount];
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
@@ -61,17 +63,40 @@ public class PartitionDataWriter implements IFrameWriter {
 
     @Override
     public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
         for (int i = 0; i < pWriters.length; ++i) {
-            if (allocatedFrame) {
-                appenders[i].flush(pWriters[i], true);
+            if (isOpen[i]) {
+                if (allocatedFrame) {
+                    try {
+                        appenders[i].flush(pWriters[i], true);
+                    } catch (Throwable th) {
+                        if (closeException == null) {
+                            closeException = new HyracksDataException(th);
+                        } else {
+                            closeException.addSuppressed(th);
+                        }
+                    }
+                }
+                try {
+                    pWriters[i].close();
+                } catch (Throwable th) {
+                    if (closeException == null) {
+                        closeException = new HyracksDataException(th);
+                    } else {
+                        closeException.addSuppressed(th);
+                    }
+                }
             }
-            pWriters[i].close();
+        }
+        if (closeException != null) {
+            throw closeException;
         }
     }
 
     @Override
     public void open() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
+            isOpen[i] = true;
             pWriters[i].open();
         }
     }
@@ -99,8 +124,22 @@ public class PartitionDataWriter implements IFrameWriter {
 
     @Override
     public void fail() throws HyracksDataException {
+        HyracksDataException failException = null;
         for (int i = 0; i < appenders.length; ++i) {
-            pWriters[i].fail();
+            if (isOpen[i]) {
+                try {
+                    pWriters[i].fail();
+                } catch (Throwable th) {
+                    if (failException == null) {
+                        failException = new HyracksDataException(th);
+                    } else {
+                        failException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (failException != null) {
+            throw failException;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index f94d985..0f3687e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -56,8 +56,8 @@ public class FileScanOperatorDescriptor extends AbstractSingleActivityOperatorDe
             @Override
             public void initialize() throws HyracksDataException {
                 File f = split.getLocalFile().getFile();
-                writer.open();
                 try {
+                    writer.open();
                     InputStream in;
                     try {
                         in = new FileInputStream(f);
@@ -66,9 +66,9 @@ public class FileScanOperatorDescriptor extends AbstractSingleActivityOperatorDe
                         throw new HyracksDataException(e);
                     }
                     tp.parse(in, writer);
-                } catch (Exception e) {
+                } catch (Throwable th) {
                     writer.fail();
-                    throw new HyracksDataException(e);
+                    throw new HyracksDataException(th);
                 } finally {
                     writer.close();
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 814e537..779c631 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -122,8 +122,8 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
     public void initialize() throws HyracksDataException {
         aggState = (ExternalGroupState) ctx.getStateObject(stateId);
         runs = aggState.getRuns();
-        writer.open();
         try {
+            writer.open();
             if (runs.size() <= 0) {
                 ISpillableTable gTable = aggState.getSpillableTable();
                 if (gTable != null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7fa9fcc..1c08b53 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -29,8 +29,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -65,8 +65,8 @@ public class PreclusteredGroupWriter implements IFrameWriter {
             RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
-        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields,
-                groupFields, writer);
+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields,
+                writer);
         this.aggregateState = aggregator.createAggregateStates();
         copyFrame = new VSizeFrame(ctx);
         inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
@@ -138,9 +138,9 @@ public class PreclusteredGroupWriter implements IFrameWriter {
         for (int j = 0; j < groupFields.length; j++) {
             tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
         }
-        boolean hasOutput = outputPartial ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor,
-                lastTupleIndex, aggregateState) : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
-                lastTupleIndex, aggregateState);
+        boolean hasOutput = outputPartial
+                ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState)
+                : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
 
         if (hasOutput) {
             appenderWrapper.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
@@ -172,13 +172,16 @@ public class PreclusteredGroupWriter implements IFrameWriter {
 
     @Override
     public void close() throws HyracksDataException {
-        if (!isFailed && !first) {
-            assert(copyFrameAccessor.getTupleCount() > 0);
-            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
-            appenderWrapper.flush();
+        try {
+            if (!isFailed && !first) {
+                assert (copyFrameAccessor.getTupleCount() > 0);
+                writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+                appenderWrapper.flush();
+            }
+            aggregator.close();
+            aggregateState.close();
+        } finally {
+            appenderWrapper.close();
         }
-        aggregator.close();
-        aggregateState.close();
-        appenderWrapper.close();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index 349cc5a..8ba7626 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -104,11 +104,8 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato
                 nullWriters1[i] = nullWriterFactories[i].createNullWriter();
             }
         }
-
-        writer.open();// open for probe
-
         try {
-
+            writer.open();// open for probe
             IFrame buffer = new VSizeFrame(ctx);
             // buffer
             int tableSize = (int) (numPartitions * recordsPerFrame * factor);
@@ -148,9 +145,9 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato
                 probeReader.close();
                 joiner.closeJoin(writer);
             }
-        } catch (Exception e) {
+        } catch (Throwable th) {
             writer.fail();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(th);
         } finally {
             writer.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index f72d528..7badc1e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -188,7 +188,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -201,12 +201,12 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
-                        .getJobId(), new TaskId(getActivityId(), partition));
+                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
+                        ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
                 private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
                 private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories).createPartitioner();
@@ -302,8 +302,8 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                         if (memsize > inputsize0) {
                             state.nPartitions = 0;
                         } else {
-                            state.nPartitions = (int) (Math.ceil((inputsize0 * factor / nPartitions - memsize)
-                                    / (memsize - 1)));
+                            state.nPartitions = (int) (Math
+                                    .ceil((inputsize0 * factor / nPartitions - memsize) / (memsize - 1)));
                         }
                         if (state.nPartitions <= 0) {
                             // becomes in-memory HJ
@@ -352,8 +352,8 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
                     RunFileWriter writer = state.fWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                                BuildAndPartitionActivityNode.class.getSimpleName());
+                        FileReference file = ctx.getJobletContext()
+                                .createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
                         state.fWriters[i] = writer;
@@ -378,7 +378,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -391,8 +391,8 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
@@ -413,9 +413,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));
                     writer.open();
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
                     buildWriters = state.fWriters;
                     probeWriters = new RunFileWriter[state.nPartitions];
                     bufferForPartitions = new IFrame[state.nPartitions];
@@ -483,65 +483,69 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.joiner.join(inBuffer.getBuffer(), writer);
-                    state.joiner.closeJoin(writer);
-                    ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
-                            .createPartitioner();
-                    ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
-                            .createPartitioner();
-                    if (state.memoryForHashtable != memsize - 2) {
-                        for (int i = 0; i < state.nPartitions; i++) {
-                            ByteBuffer buf = bufferForPartitions[i].getBuffer();
-                            accessorProbe.reset(buf);
-                            if (accessorProbe.getTupleCount() > 0) {
-                                write(i, buf);
+                    try {
+                        state.joiner.join(inBuffer.getBuffer(), writer);
+                        state.joiner.closeJoin(writer);
+                        ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
+                                .createPartitioner();
+                        ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
+                                .createPartitioner();
+                        if (state.memoryForHashtable != memsize - 2) {
+                            for (int i = 0; i < state.nPartitions; i++) {
+                                ByteBuffer buf = bufferForPartitions[i].getBuffer();
+                                accessorProbe.reset(buf);
+                                if (accessorProbe.getTupleCount() > 0) {
+                                    write(i, buf);
+                                }
+                                closeWriter(i);
                             }
-                            closeWriter(i);
-                        }
 
-                        inBuffer.reset();
-                        int tableSize = -1;
-                        if (state.memoryForHashtable == 0) {
-                            tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
-                        } else {
-                            tableSize = (int) (memsize * recordsPerFrame * factor);
-                        }
-                        ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                        for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
-                            RunFileWriter buildWriter = buildWriters[partitionid];
-                            RunFileWriter probeWriter = probeWriters[partitionid];
-                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
-                                continue;
+                            inBuffer.reset();
+                            int tableSize = -1;
+                            if (state.memoryForHashtable == 0) {
+                                tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
+                            } else {
+                                tableSize = (int) (memsize * recordsPerFrame * factor);
                             }
-                            table.reset();
-                            InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0),
-                                    hpcRep0, new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0,
-                                            keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
-
-                            if (buildWriter != null) {
-                                RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
-                                buildReader.open();
-                                while (buildReader.nextFrame(inBuffer)) {
-                                    ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
-                                    FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
-                                    joiner.build(copyBuffer);
-                                    inBuffer.reset();
+                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                            for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
+                                RunFileWriter buildWriter = buildWriters[partitionid];
+                                RunFileWriter probeWriter = probeWriters[partitionid];
+                                if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
+                                    continue;
+                                }
+                                table.reset();
+                                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
+                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
+                                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+                                        nullWriters1, table, predEvaluator);
+
+                                if (buildWriter != null) {
+                                    RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
+                                    buildReader.open();
+                                    while (buildReader.nextFrame(inBuffer)) {
+                                        ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
+                                        FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
+                                        joiner.build(copyBuffer);
+                                        inBuffer.reset();
+                                    }
+                                    buildReader.close();
                                 }
-                                buildReader.close();
-                            }
 
-                            // probe
-                            RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
-                            probeReader.open();
-                            while (probeReader.nextFrame(inBuffer)) {
-                                joiner.join(inBuffer.getBuffer(), writer);
-                                inBuffer.reset();
+                                // probe
+                                RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
+                                probeReader.open();
+                                while (probeReader.nextFrame(inBuffer)) {
+                                    joiner.join(inBuffer.getBuffer(), writer);
+                                    inBuffer.reset();
+                                }
+                                probeReader.close();
+                                joiner.closeJoin(writer);
                             }
-                            probeReader.close();
-                            joiner.closeJoin(writer);
                         }
+                    } finally {
+                        writer.close();
                     }
-                    writer.close();
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
@@ -554,8 +558,8 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
                     RunFileWriter writer = probeWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class
-                                .getSimpleName());
+                        FileReference file = ctx
+                                .createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
                         probeWriters[i] = writer;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index dae441b..87ac9bd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -82,8 +82,7 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
-            INullWriterFactory[] nullWriterFactories1,
-            int tableSize) {
+            INullWriterFactory[] nullWriterFactories1, int tableSize) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
@@ -174,9 +173,8 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ?
-                    null :
-                    predEvaluatorFactory.createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildTaskState state;
@@ -187,13 +185,12 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
                             .createPartitioner();
                     ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
-                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                    state.joiner = new InMemoryHashJoin(ctx, tableSize,
-                            new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), hpc1,
-                            new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+                    state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
+                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
+                            isLeftOuter, nullWriters1, table, predEvaluator);
                 }
 
                 @Override
@@ -231,9 +228,9 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(), 0),
-                            partition));
                     writer.open();
+                    state = (HashBuildTaskState) ctx
+                            .getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition));
                 }
 
                 @Override
@@ -243,8 +240,11 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.joiner.closeJoin(writer);
-                    writer.close();
+                    try {
+                        state.joiner.closeJoin(writer);
+                    } finally {
+                        writer.close();
+                    }
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0ad2cceb/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 03570c7..a12450e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -132,9 +132,8 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null) ?
-                    predEvaluatorFactory.createPredicateEvaluator() :
-                    null);
+            final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null)
+                    ? predEvaluatorFactory.createPredicateEvaluator() : null);
 
             final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
@@ -148,12 +147,11 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
 
-                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0),
-                            new FrameTupleAccessor(rd1), comparator, memSize, predEvaluator, isLeftOuter,
-                            nullWriters1);
+                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1),
+                            comparator, memSize, predEvaluator, isLeftOuter, nullWriters1);
 
                 }
 
@@ -194,9 +192,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (JoinCacheTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            JOIN_CACHE_ACTIVITY_ID), partition));
                     writer.open();
+                    state = (JoinCacheTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition));
                 }
 
                 @Override
@@ -206,8 +204,11 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.joiner.closeJoin(writer);
-                    writer.close();
+                    try {
+                        state.joiner.closeJoin(writer);
+                    } finally {
+                        writer.close();
+                    }
                 }
 
                 @Override


Mime
View raw message