asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [2/2] asterixdb git commit: ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators.
Date Fri, 24 Feb 2017 05:33:38 GMT
ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators.

This change includes the following parts:
- Fix the implementation of fail() and close() in several runtime operators
  to avoid file handle leak and job hang;
- Add an erase method to RunFileWriter which closes files before deleting
  them in order release the holding disk space;
- Call RunFileWriter.close() and RunFileReader.close() in "finally" blocks.
- Fix RunFileReader to not truncate files to be deleted - it is not the root
  cause of un-released disk space - open deleted files are the root cuase;
- Check file handle leaks in LangExecutionUtil.tearDown().

Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1513
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/34f2384e
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/34f2384e
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/34f2384e

Branch: refs/heads/master
Commit: 34f2384ea66a4b62627cb5a3ff3544b6d4558469
Parents: 452ec9f
Author: Yingyi Bu <yingyi@couchbase.com>
Authored: Thu Feb 23 18:22:32 2017 -0800
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Thu Feb 23 21:32:21 2017 -0800

----------------------------------------------------------------------
 .../ConnectorDescriptorWithMessagingTest.java   |  82 ++---
 .../asterix/test/runtime/LangExecutionUtil.java |  28 ++
 ...1_pricing_summary_report_failure.3.query.aql |   8 +-
 .../group_by_failure.1.ddl.sqlpp                |  46 +++
 .../group_by_failure.2.update.sqlpp             |  24 ++
 .../group_by_failure.3.query.sqlpp              |  26 ++
 .../group_by_hash_failure.1.ddl.sqlpp           |  46 +++
 .../group_by_hash_failure.2.update.sqlpp        |  24 ++
 .../group_by_hash_failure.3.query.sqlpp         |  27 ++
 .../order_by_failure.1.ddl.sqlpp                |  46 +++
 .../order_by_failure.2.update.sqlpp             |  24 ++
 .../order_by_failure.3.query.sqlpp              |  26 ++
 ...1_pricing_summary_report_failure.1.ddl.sqlpp |   4 +-
 ...ricing_summary_report_failure.2.update.sqlpp |   3 +-
 ...pricing_summary_report_failure.3.query.sqlpp |  43 +--
 ...18_large_volume_customer_failure.1.ddl.sqlpp |  73 ++++
 ...large_volume_customer_failure.2.update.sqlpp |  29 ++
 ..._large_volume_customer_failure.3.query.sqlpp |  37 ++
 .../resources/runtimets/testsuite_sqlpp.xml     |  37 +-
 .../external/dataset/adapter/LookupAdapter.java |   3 +
 .../feed/dataflow/FeedRuntimeInputHandler.java  |   3 +-
 .../FeedIntakeOperatorNodePushable.java         |  14 +-
 .../LSMPrimaryUpsertOperatorNodePushable.java   |   9 +-
 .../std/NoTupleSourceRuntimeFactory.java        |   7 +-
 .../management/runtime/CommitRuntime.java       |  10 +-
 ...actOneInputOneOutputOneFramePushRuntime.java |   3 +
 .../meta/AlgebricksMetaOperatorDescriptor.java  |   4 +-
 .../operators/std/AssignRuntimeFactory.java     |   2 +-
 .../hyracks/api/exceptions/ErrorCode.java       |   2 +-
 .../src/main/resources/errormsg/en.properties   |   2 +-
 .../nc/dataset/DatasetPartitionWriter.java      |  12 +-
 .../hyracks/control/nc/dataset/ResultState.java |  12 +-
 .../hyracks/control/nc/io/FileHandle.java       |   5 -
 .../ProfilingPartitionWriterFactory.java        |  11 +-
 .../common/comm/io/SerializingDataWriter.java   |  10 +-
 .../dataflow/common/io/RunFileReader.java       |   7 +-
 .../dataflow/common/io/RunFileWriter.java       |  11 +-
 .../ExternalGroupBuildOperatorNodePushable.java |   5 +-
 .../ExternalGroupWriteOperatorNodePushable.java |  11 +-
 .../std/group/external/ExternalHashGroupBy.java |  15 +-
 .../join/HybridHashJoinOperatorDescriptor.java  |  44 ++-
 .../dataflow/std/join/InMemoryHashJoin.java     |   6 +-
 .../InMemoryHashJoinOperatorDescriptor.java     |   8 +-
 .../dataflow/std/join/NestedLoopJoin.java       |  39 ++-
 .../join/NestedLoopJoinOperatorDescriptor.java  |  20 +-
 .../std/join/OptimizedHybridHashJoin.java       |  68 ++--
 ...timizedHybridHashJoinOperatorDescriptor.java | 334 +++++++++++--------
 .../std/misc/MaterializerTaskState.java         |   6 +-
 .../result/ResultWriterOperatorDescriptor.java  |  16 +-
 .../std/sort/AbstractExternalSortRunMerger.java |  22 +-
 .../tests/rewriting/ErrorReportingTest.java     |   2 +-
 .../rewriting/SuperActivityRewritingTest.java   |   4 +-
 .../tests/unit/AbstractExternalGroupbyTest.java |  12 +-
 .../tests/unit/AbstractRunGeneratorTest.java    |  15 +-
 54 files changed, 1027 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index a3d6102..d04217c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -83,47 +83,53 @@ public class ConnectorDescriptorWithMessagingTest {
             TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
             IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
                     CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
-            partitioner.open();
-            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
             List<TestFrameWriter> recipients = new ArrayList<>();
-            for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
-                recipients.add((TestFrameWriter) writer);
-            }
-            partitioner.flush();
-            for (TestFrameWriter writer : recipients) {
-                Assert.assertEquals(writer.nextFrameCount(), 1);
-                fta.reset(writer.getLastFrame());
-                Assert.assertEquals(fta.getTupleCount(), 1);
-                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
-                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
-            }
-            message.getBuffer().clear();
-            message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
-            message.getBuffer().flip();
-            partitioner.flush();
-            for (TestFrameWriter writer : recipients) {
-                Assert.assertEquals(writer.nextFrameCount(), 2);
-                fta.reset(writer.getLastFrame());
-                Assert.assertEquals(fta.getTupleCount(), 1);
-                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
-                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
-            }
+            try {
+                partitioner.open();
+                FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+                for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
+                    recipients.add((TestFrameWriter) writer);
+                }
+                partitioner.flush();
+                for (TestFrameWriter writer : recipients) {
+                    Assert.assertEquals(writer.nextFrameCount(), 1);
+                    fta.reset(writer.getLastFrame());
+                    Assert.assertEquals(fta.getTupleCount(), 1);
+                    FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                    Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                            MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                }
+                message.getBuffer().clear();
+                message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+                message.getBuffer().flip();
+                partitioner.flush();
+                for (TestFrameWriter writer : recipients) {
+                    Assert.assertEquals(writer.nextFrameCount(), 2);
+                    fta.reset(writer.getLastFrame());
+                    Assert.assertEquals(fta.getTupleCount(), 1);
+                    FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                    Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                            MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                }
 
-            message.getBuffer().clear();
-            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
-            message.getBuffer().flip();
-            partitioner.flush();
-            for (TestFrameWriter writer : recipients) {
-                Assert.assertEquals(writer.nextFrameCount(), 3);
-                fta.reset(writer.getLastFrame());
-                Assert.assertEquals(fta.getTupleCount(), 1);
-                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
-                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                message.getBuffer().clear();
+                message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+                message.getBuffer().flip();
+                partitioner.flush();
+                for (TestFrameWriter writer : recipients) {
+                    Assert.assertEquals(writer.nextFrameCount(), 3);
+                    fta.reset(writer.getLastFrame());
+                    Assert.assertEquals(fta.getTupleCount(), 1);
+                    FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                    Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                            MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                }
+            } catch (Throwable t) {
+                partitioner.fail();
+                throw t;
+            } finally {
+                partitioner.close();
             }
-            partitioner.close();
             for (TestFrameWriter writer : recipients) {
                 Assert.assertEquals(writer.nextFrameCount(), 4);
                 Assert.assertEquals(writer.closeCount(), 1);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 4db00c8..0e6be0f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,12 @@
 
 package org.apache.asterix.test.runtime;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -28,7 +33,9 @@ import org.apache.asterix.app.external.TestLibrarian;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -63,6 +70,9 @@ public class LangExecutionUtil {
     }
 
     public static void tearDown() throws Exception {
+        // Check whether there are leaked open run file handles.
+        checkRunFileLeaks();
+
         TestLibrarian.removeLibraryDir();
         ExecutionTestUtil.tearDown(cleanupOnStop);
         ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
@@ -117,4 +127,22 @@ public class LangExecutionUtil {
             System.err.flush();
         }
     }
+
+    private static void checkRunFileLeaks() throws IOException {
+        if (SystemUtils.IS_OS_WINDOWS) {
+            return;
+        }
+        // Only run the check on Linux and MacOS.
+        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+        String processName = runtimeMXBean.getName();
+        String processId = processName.split("@")[0];
+
+        // Checks whether there are leaked run files from operators.
+        Process process = Runtime.getRuntime()
+                .exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf|wc -l" });
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+            int runFileCount = Integer.parseInt(reader.readLine().trim());
+            Assert.assertTrue(runFileCount == 0);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
index 1757130..9b2e66d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
@@ -18,11 +18,11 @@
  */
 use dataverse tpch;
 
- 
+
 for $l in dataset('LineItem')
 //where inject-failure($l.l_shipdate <= '1998-09-02', $l.l_orderkey=5999)
 /*+ hash*/
-group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus  
+group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus
   with $l
 order by $l_returnflag, $l_linestatus
 return {
@@ -32,8 +32,8 @@ return {
   "sum_base_price": sum(for $i in $l return $i.l_extendedprice),
   "sum_disc_price": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount)),
   "sum_charge": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount) * (1 + $i.l_tax)),
-  "ave_qty": avg(for $i in $l return $i.l_quantity),  
+  "ave_qty": avg(for $i in $l return $i.l_quantity),
   "ave_price": avg(for $i in $l return $i.l_extendedprice),
   "ave_disc": avg(for $i in $l return $i.l_discount),
   "count_order": count($l)
-}   
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp
new file mode 100644
index 0000000..a3b7589
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity
+FROM  LineItem l
+GROUP BY l_orderkey;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp
new file mode 100644
index 0000000..c11a0ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity
+FROM  LineItem l
+/*+ hash */
+GROUP BY l_orderkey;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp
new file mode 100644
index 0000000..0314a6e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=1024), l_quantity
+FROM  LineItem l
+ORDER BY l_shipdate, l_orderkey;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
index f91df13..5ebec80 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
@@ -20,10 +20,10 @@
 drop  dataverse tpch if exists;
 create  dataverse tpch;
 
-use test;
+use tpch;
 
 
-create type test.LineItemType as
+create type LineItemType as
  closed {
   l_orderkey : integer,
   l_partkey : integer,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
index 5fe734c..0340837 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
@@ -20,5 +20,6 @@
 use tpch;
 
 
-load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
index cac4a08..e96644b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
@@ -17,33 +17,22 @@
  * under the License.
  */
 
-use tpch;
+USE tpch;
 
+SET `import-private-functions` "true";
 
-select element {'l_returnflag':l_returnflag,'l_linestatus':l_linestatus,'sum_qty':tpch.coll_sum((
-        select element i.l_quantity
-        from  l as i
-    )),'sum_base_price':tpch.coll_sum((
-        select element i.l_extendedprice
-        from  l as i
-    )),'sum_disc_price':tpch.coll_sum((
-        select element (i.l_extendedprice * (1 - i.l_discount))
-        from  l as i
-    )),'sum_charge':tpch.coll_sum((
-        select element (i.l_extendedprice * (1 - i.l_discount) * (1 + i.l_tax))
-        from  l as i
-    )),'ave_qty':tpch.coll_avg((
-        select element i.l_quantity
-        from  l as i
-    )),'ave_price':tpch.coll_avg((
-        select element i.l_extendedprice
-        from  l as i
-    )),'ave_disc':tpch.coll_avg((
-        select element i.l_discount
-        from  l as i
-    )),'count_order':tpch.coll_count(l)}
-from  LineItem as l
-/* +hash */
-group by l.l_returnflag as l_returnflag,l.l_linestatus as l_linestatus
-order by l_returnflag,l_linestatus
+SELECT  l_returnflag,
+        l_linestatus,
+        sum(l_quantity) AS sum_qty,
+        sum(l_extendedprice) AS sum_base_price,
+        sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
+        avg(l_quantity) AS ave_qty,
+        avg(l_extendedprice) AS ave_price,
+        avg(l_discount) AS ave_disc,
+        count(1) AS count_order
+FROM  LineItem l
+WHERE l.l_shipdate <= '1998-09-02' AND inject_failure(true, l.l_orderkey=5988)
+GROUP BY l_returnflag, l_linestatus
+ORDER BY l_returnflag, l_linestatus
 ;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..9e64c15
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : bigint,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create type OrderType as {
+  o_orderkey : bigint,
+  o_custkey : bigint,
+  o_orderstatus : string,
+  o_totalprice : double,
+  o_orderdate : string,
+  o_orderpriority : string,
+  o_clerk : string,
+  o_shippriority : bigint,
+  o_comment : string
+}
+
+create type CustomerType as {
+  c_custkey : bigint,
+  c_name : string,
+  c_address : string,
+  c_nationkey : bigint,
+  c_phone : string,
+  c_acctbal : double,
+  c_mktsegment : string,
+  c_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create  dataset Orders(OrderType) primary key o_orderkey;
+
+create  dataset Customer(CustomerType) primary key c_custkey;
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp
new file mode 100644
index 0000000..83ec6c3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
+
+load  dataset Orders using localfs ((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
+
+load  dataset Customer using localfs ((`path`=`asterix_nc1://data/tpch0.001/customer.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp
new file mode 100644
index 0000000..35acf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+WITH tmp AS
+(
+    SELECT l_orderkey, SUM(l_quantity) t_sum_quantity
+    FROM  LineItem
+    GROUP BY l_orderkey
+)
+
+SELECT c.c_name, c.c_custkey, inject_failure(o.o_orderkey, o.o_orderkey=5988),
+       o.o_orderdate, o.o_totalprice, l.l_quantity
+FROM  LineItem l
+JOIN  tmp t ON t.l_orderkey = l.l_orderkey
+JOIN  Orders o ON o.o_orderkey = t.l_orderkey
+JOIN  Customer c ON c.c_custkey = o.o_custkey
+;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0d8da65..431b215 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -2028,13 +2028,36 @@
     </test-case>
   </test-group>
   <test-group name="failure">
-    <!--
-        <test-case FilePath="failure">
-          <compilation-unit name="q1_pricing_summary_report_failure">
-            <output-dir compare="Text">q1_pricing_summary_report_failure</output-dir>
-          </compilation-unit>
-        </test-case>
-        -->
+    <test-case FilePath="failure">
+      <compilation-unit name="group_by_failure">
+        <output-dir compare="Text">group_by_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="group_by_hash_failure">
+        <output-dir compare="Text">group_by_hash_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="q01_pricing_summary_report_failure">
+        <output-dir compare="Text">q01_pricing_summary_report_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="q18_large_volume_customer_failure">
+        <output-dir compare="Text">q18_large_volume_customer_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="order_by_failure">
+        <output-dir compare="Text">order_by_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <!--
     <test-group name="flwor">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index baf5dba..dd713e6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -155,6 +155,9 @@ public final class LookupAdapter<T> implements IFrameWriter {
     public void close() throws HyracksDataException {
         try {
             appender.write(writer, true);
+        } catch (Exception e) {
+            writer.fail();
+            throw e;
         } finally {
             writer.close();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index cd04515..9982477 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -129,8 +129,9 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
             }
         } catch (Throwable th) {
             LOGGER.log(Level.WARNING, th.getMessage(), th);
+        } finally {
+            writer.close();
         }
-        writer.close();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 99fff19..fe2d4ec 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -74,15 +74,17 @@ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePush
                 throw new RuntimeDataException(
                         ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION);
             }
-        } catch (Throwable ie) {
+        } catch (Exception e) {
             /*
-             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
-             * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
-             * The surviving intake partitions must continue to live and receive data from the external source.
+             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another
+             * node involved in the Hyracks job. As the Intake job involves only the intake operator, the exception is
+             * indicative of a failure at the sibling intake operator location. The surviving intake partitions must
+             * continue to live and receive data from the external source.
              */
-            throw new HyracksDataException(ie);
+            writer.fail();
+            throw e;
         } finally {
-                writer.close();
+            writer.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index b2ffd6e..6869523 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -301,10 +301,11 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     @Override
     public void close() throws HyracksDataException {
         try {
-            cursor.close();
-            writer.close();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
+            try {
+                cursor.close();
+            } finally {
+                writer.close();
+            }
         } finally {
             indexHelper.close();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
index c3e2681..377ad60 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
@@ -39,8 +39,11 @@ public class NoTupleSourceRuntimeFactory implements IPushRuntimeFactory {
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
-                writer.close();
+                try {
+                    writer.open();
+                } finally {
+                    writer.close();
+                }
             }
 
         };

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index d38c5b7..33078ff 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -170,8 +170,14 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
         if (isSink) {
             return;
         }
-        flushIfNotFailed();
-        writer.close();
+        try {
+            flushIfNotFailed();
+        } catch (Exception e) {
+            writer.fail();
+            throw e;
+        } finally {
+            writer.close();
+        }
         appender.reset(frame, true);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index e94f4b7..2d8eaed 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -53,6 +53,9 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
     public void close() throws HyracksDataException {
         try {
             flushIfNotFailed();
+        } catch (Exception e) {
+            writer.fail();
+            throw e;
         } finally {
             writer.close();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1123c5e..3ac8c40 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -21,8 +21,6 @@ package org.apache.hyracks.algebricks.runtime.operators.meta;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,6 +34,8 @@ import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescri
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 60d7eec..aefc99d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -179,7 +179,7 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
             @Override
             public void fail() throws HyracksDataException {
                 if (isOpen) {
-                    writer.fail();
+                    super.fail();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8d312d5..b5bd1a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -53,7 +53,7 @@ public class ErrorCode {
     public static final int RESULT_FAILURE_EXCEPTION = 16;
     public static final int RESULT_FAILURE_NO_EXCEPTION = 17;
     public static final int INCONSISTENT_RESULT_METADATA = 18;
-    public static final int CANNOT_TRUNCATE_OR_DELETE_FILE = 19;
+    public static final int CANNOT_DELETE_FILE = 19;
     public static final int NOT_A_JOBID = 20;
     public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
     public static final int DUPLICATE_DISTRIBUTED_JOB = 22;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 7f90c35..f536d3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -37,7 +37,7 @@
 16 = Failure producing result set %1$s for job %2$s
 17 = No exception for failed result set %1$s for job %2$s
 18 = Inconsistent metadata for result set %1$s"
-19 = Can't truncate or delete the file: %1$s
+19 = Cannot delete the file: %1$s
 20 = '%1$s' is not a valid job id.
 21 = The distributed job %1$s was not found
 22 = The distributed job %1$s already exists

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index f7aa2e8..952eb75 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -55,6 +55,8 @@ public class DatasetPartitionWriter implements IFrameWriter {
 
     private boolean partitionRegistered;
 
+    private boolean failed = false;
+
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
             ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
             DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) {
@@ -97,6 +99,7 @@ public class DatasetPartitionWriter implements IFrameWriter {
     @Override
     public void fail() throws HyracksDataException {
         try {
+            failed = true;
             resultState.closeAndDelete();
             resultState.abort();
             registerResultPartitionLocation(false);
@@ -111,8 +114,13 @@ public class DatasetPartitionWriter implements IFrameWriter {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + partition + ")");
         }
-        registerResultPartitionLocation(true);
-        resultState.close();
+        try {
+            if (!failed) {
+                registerResultPartitionLocation(true);
+            }
+        } finally {
+            resultState.close();
+        }
         try {
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
         } catch (HyracksException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index be7ed3d..c501b5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -86,6 +86,7 @@ public class ResultState implements IStateObject {
 
     public synchronized void close() {
         eos.set(true);
+        closeWriteFileHandle();
         notifyAll();
     }
 
@@ -93,6 +94,13 @@ public class ResultState implements IStateObject {
         // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
         // to be taken when there are more requests to these result states.
         failed.set(true);
+        closeWriteFileHandle();
+        if (fileRef != null) {
+            fileRef.delete();
+        }
+    }
+
+    private void closeWriteFileHandle() {
         if (writeFileHandle != null) {
             try {
                 ioManager.close(writeFileHandle);
@@ -100,9 +108,6 @@ public class ResultState implements IStateObject {
                 // Since file handle could not be closed, just ignore.
             }
         }
-        if (fileRef != null) {
-            fileRef.delete();
-        }
     }
 
     public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
@@ -114,7 +119,6 @@ public class ResultState implements IStateObject {
         }
 
         size += ioManager.syncWrite(writeFileHandle, size, buffer);
-
         notifyAll();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index f0e7f0e..33b8980 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -72,7 +72,6 @@ public class FileHandle implements IFileHandle {
     }
 
     public void close() throws IOException {
-        channel.close();
         raf.close();
     }
 
@@ -80,10 +79,6 @@ public class FileHandle implements IFileHandle {
         return fileRef;
     }
 
-    public RandomAccessFile getRandomAccessFile() {
-        return raf;
-    }
-
     public FileChannel getFileChannel() {
         return channel;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index e5e81ab..e51d2bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -80,10 +80,13 @@ public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory
             @Override
             public void close() throws HyracksDataException {
                 closeTime = System.currentTimeMillis();
-                ((Task) ctx)
-                        .setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(),
-                                cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
-                writer.close();
+                try {
+                    ((Task) ctx).setPartitionSendProfile(
+                            new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(), cd.getConnectorId(),
+                                    senderIndex, receiverIndex), openTime, closeTime, mrep));
+                } finally {
+                    writer.close();
+                }
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 365b01c..d9a4c7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -62,8 +62,14 @@ public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
         if (!open) {
             throw new HyracksDataException("Closing SerializingDataWriter that has not been opened");
         }
-        tupleAppender.write(frameWriter, true);
-        frameWriter.close();
+        try {
+            tupleAppender.write(frameWriter, true);
+        } catch (Exception e) {
+            frameWriter.fail();
+            throw e;
+        } finally {
+            frameWriter.close();
+        }
         open = false;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index b69f377..f0bd318 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -29,7 +29,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.control.nc.io.FileHandle;
 
 public class RunFileReader implements IFrameReader {
     private final FileReference file;
@@ -49,7 +48,7 @@ public class RunFileReader implements IFrameReader {
     @Override
     public void open() throws HyracksDataException {
         // Opens RW mode because we need to truncate the given file if required.
-        handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE,
+        handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         readPtr = 0;
     }
@@ -87,12 +86,10 @@ public class RunFileReader implements IFrameReader {
     public void close() throws HyracksDataException {
         if (deleteAfterClose) {
             try {
-                // Truncates the file size to zero since OS might be keeping the file for a while.
-                ((FileHandle) handle).getFileChannel().truncate(0);
                 ioManager.close(handle);
                 FileUtils.deleteQuietly(file.getFile());
             } catch (IOException e) {
-                throw HyracksDataException.create(ErrorCode.CANNOT_TRUNCATE_OR_DELETE_FILE, e, file.toString());
+                throw HyracksDataException.create(ErrorCode.CANNOT_DELETE_FILE, e, file.toString());
             }
         } else {
             ioManager.close(handle);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 8031422..915c63d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -27,8 +27,8 @@ import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
 
 public class RunFileWriter implements IFrameWriter {
-    private final FileReference file;
     private final IIOManager ioManager;
+    private FileReference file;
     private boolean failed;
 
     private IFileHandle handle;
@@ -69,6 +69,15 @@ public class RunFileWriter implements IFrameWriter {
         }
     }
 
+    public void erase() throws HyracksDataException {
+        close();
+        file.delete();
+
+        // Make sure we never access the file if it is deleted.
+        file = null;
+        handle = null;
+    }
+
     public FileReference getFileReference() {
         return file;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 7d10802..c049b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -107,10 +107,9 @@ public class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSi
     @Override
     public void close() throws HyracksDataException {
         if (isFailed && state.getRuns() != null) {
-            for (int i = 0; i < state.getRuns().length; i++) {
-                RunFileWriter run = state.getRuns()[i];
+            for (RunFileWriter run : state.getRuns()) {
                 if (run != null) {
-                    run.getFileReference().delete();
+                    run.erase();
                 }
             }
         } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index 0dbb063..b17215f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -95,9 +95,14 @@ public class ExternalGroupWriteOperatorNodePushable extends AbstractUnaryOutputS
             writer.open();
             doPass(table, partitionRuns, numberOfTuples, writer, 1); // level 0 use used at build stage.
         } catch (Exception e) {
-            generatedRuns.forEach(run -> run.getFileReference().delete());
-            writer.fail();
-            throw new HyracksDataException(e);
+            try {
+                for (RunFileWriter run : generatedRuns) {
+                    run.erase();
+                }
+            } finally {
+                writer.fail();
+            }
+            throw e;
         } finally {
             writer.close();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
index e0ef2b3..d29e9ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
@@ -75,10 +75,17 @@ public class ExternalHashGroupBy {
     }
 
     public void flushSpilledPartitions() throws HyracksDataException {
-        for (int i = 0; i < runWriters.length; ++i) {
-            if (runWriters[i] != null) {
-                flushPartitionToRun(i, runWriters[i]);
-                runWriters[i].close();
+        try {
+            for (int i = 0; i < runWriters.length; ++i) {
+                if (runWriters[i] != null) {
+                    flushPartitionToRun(i, runWriters[i]);
+                }
+            }
+        } finally {
+            for (int i = 0; i < runWriters.length; ++i) {
+                if (runWriters[i] != null) {
+                    runWriters[i].close();
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index ad14bad..b622c9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -470,8 +470,12 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                 @Override
                 public void close() throws HyracksDataException {
                     try {
-                        state.joiner.join(inBuffer.getBuffer(), writer);
-                        state.joiner.closeJoin(writer);
+                        try {
+                            state.joiner.join(inBuffer.getBuffer(), writer);
+                            state.joiner.completeJoin(writer);
+                        } finally {
+                            state.joiner.releaseMemory();
+                        }
                         ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
                                 .createPartitioner();
                         ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
@@ -508,25 +512,35 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                                 if (buildWriter != null) {
                                     RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
-                                    buildReader.open();
-                                    while (buildReader.nextFrame(inBuffer)) {
-                                        ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
-                                        FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
-                                        joiner.build(copyBuffer);
-                                        inBuffer.reset();
+                                    try {
+                                        buildReader.open();
+                                        while (buildReader.nextFrame(inBuffer)) {
+                                            ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
+                                            FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
+                                            joiner.build(copyBuffer);
+                                            inBuffer.reset();
+                                        }
+                                    } finally {
+                                        buildReader.close();
                                     }
-                                    buildReader.close();
                                 }
 
                                 // probe
                                 RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
-                                probeReader.open();
-                                while (probeReader.nextFrame(inBuffer)) {
-                                    joiner.join(inBuffer.getBuffer(), writer);
-                                    inBuffer.reset();
+                                try {
+                                    probeReader.open();
+                                    try {
+                                        while (probeReader.nextFrame(inBuffer)) {
+                                            joiner.join(inBuffer.getBuffer(), writer);
+                                            inBuffer.reset();
+                                        }
+                                        joiner.completeJoin(writer);
+                                    } finally {
+                                        joiner.releaseMemory();
+                                    }
+                                } finally {
+                                    probeReader.close();
                                 }
-                                probeReader.close();
-                                joiner.closeJoin(writer);
                             }
                         }
                     } finally {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 8e52838..ec1c3a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -196,8 +196,11 @@ public class InMemoryHashJoin {
         accessorProbe.reset(newAccessorProbe.getBuffer());
     }
 
-    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+    public void completeJoin(IFrameWriter writer) throws HyracksDataException {
         appender.write(writer, true);
+    }
+
+    public void releaseMemory() throws HyracksDataException {
         int nFrames = buffers.size();
         // Frames assigned to the data table will be released here.
         if (bufferManager != null) {
@@ -206,7 +209,6 @@ public class InMemoryHashJoin {
             }
         }
         buffers.clear();
-
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
                     + Thread.currentThread().getId() + ".");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34f2384e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index a8d3f7e..cbeadd8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -275,9 +275,13 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
                 @Override
                 public void close() throws HyracksDataException {
                     try {
-                        state.joiner.closeJoin(writer);
+                        state.joiner.completeJoin(writer);
                     } finally {
-                        writer.close();
+                        try {
+                            state.joiner.releaseMemory();
+                        } finally {
+                            writer.close();
+                        }
                     }
                 }
 


Mime
View raw message