asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject asterixdb git commit: [ASTERIXDB-1943][API][STO] Make rebalance idempotent.
Date Fri, 23 Jun 2017 16:23:58 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master fe00b1af9 -> a32a852be


[ASTERIXDB-1943][API][STO] Make rebalance idempotent.

- user model changes:
  added rebalance cancellation HTTP API.

- storage format changes: no

- interface changes: no

Details:
- add a HTTP API for cancelling a rebalance request;
- clean up leftover states at the beginning of a
  rebalance request;
- add tests for rebalance cancellation.

Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1821
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a32a852b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a32a852b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a32a852b

Branch: refs/heads/master
Commit: a32a852bef6d15f0d99b85d548a99d8ab88b4bba
Parents: fe00b1a
Author: Yingyi Bu <yingyi@couchbase.com>
Authored: Thu Jun 22 16:03:32 2017 -0700
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Fri Jun 23 09:23:13 2017 -0700

----------------------------------------------------------------------
 .../api/http/server/RebalanceApiServlet.java    | 115 +++++++++++++++---
 .../apache/asterix/app/nc/RecoveryManager.java  |  11 +-
 .../org/apache/asterix/utils/RebalanceUtil.java |  84 +++++++++++--
 .../test/common/CancellationTestExecutor.java   |  20 +---
 .../RebalanceCancellationTestExecutor.java      | 119 +++++++++++++++++++
 .../asterix/test/common/TestExecutor.java       |   6 +-
 .../runtime/RebalanceWithCancellationIT.java    |  70 +++++++++++
 .../SqlppExecutionWithCancellationTest.java     |   3 +-
 .../common/context/DatasetLifecycleManager.java |  36 ++++--
 .../common/exceptions/ExceptionUtils.java       |  19 ++-
 .../asterix/metadata/utils/IndexUtil.java       |   9 +-
 .../utils/SecondaryIndexOperationsHelper.java   |   2 +-
 .../SecondaryTreeIndexOperationsHelper.java     |   5 +-
 .../PersistentLocalResourceRepository.java      |  19 +--
 .../hyracks/api/exceptions/ErrorCode.java       |   1 +
 .../src/main/resources/errormsg/en.properties   |   1 +
 .../dataflow/IndexDropOperatorDescriptor.java   |   9 +-
 .../dataflow/IndexDropOperatorNodePushable.java |  14 ++-
 18 files changed, 464 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 53d8f6b..3bd1be5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -21,12 +21,18 @@ package org.apache.asterix.api.http.server;
 import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.PrintWriter;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -61,20 +67,39 @@ public class RebalanceApiServlet extends AbstractServlet {
     private static final String METADATA = "Metadata";
     private final ICcApplicationContext appCtx;
 
+    // One-at-a-time thread executor, for rebalance tasks.
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    // A queue that maintains submitted rebalance requests.
+    private final Queue<Future> rebalanceTasks = new ArrayDeque<>();
+
+    // A queue that tracks the termination of rebalance threads.
+    private final Queue<CountDownLatch> rebalanceFutureTerminated = new ArrayDeque<>();
+
     public RebalanceApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
         super(ctx, paths);
         this.appCtx = appCtx;
     }
 
     @Override
-    protected void post(IServletRequest request, IServletResponse response) {
-        PrintWriter out = response.writer();
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode jsonResponse = om.createObjectNode();
+    protected void delete(IServletRequest request, IServletResponse response) {
         try {
             // Sets the content type.
             HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
+            // Cancels all rebalance requests.
+            cancelRebalance();
+            // Sends the response back.
+            sendResponse(response, HttpResponseStatus.OK, "rebalance tasks are cancelled");
+        } catch (Exception e) {
+            // Sends back and logs internal error if any exception happens during cancellation.
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage(), e);
+        }
 
+    }
+
+    @Override
+    protected void post(IServletRequest request, IServletResponse response) {
+        try {
             // Gets dataverse, dataset, and target nodes for rebalance.
             String dataverseName = request.getParameter("dataverseName");
             String datasetName = request.getParameter("datasetName");
@@ -82,31 +107,66 @@ public class RebalanceApiServlet extends AbstractServlet {
 
             // Parses and check target nodes.
             if (nodes == null) {
-                sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
-                        "nodes are not given");
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST, "nodes are not given");
                 return;
             }
             String nodesString = StringUtils.strip(nodes, "\"'").trim();
             String[] targetNodes = nodesString.split(",");
             if ("".equals(nodesString)) {
-                sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
-                        "target nodes should not be empty");
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST, "target nodes should not be empty");
                 return;
             }
 
             // If a user gives parameter datasetName, she should give dataverseName as well.
             if (dataverseName == null && datasetName != null) {
-                sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST,
                         "to rebalance a particular dataset, the parameter dataverseName must be given");
                 return;
             }
 
             // Does not allow rebalancing a metadata dataset.
             if (METADATA.equals(dataverseName)) {
-                sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
-                        "cannot rebalance a metadata dataset");
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST, "cannot rebalance a metadata dataset");
                 return;
             }
+            // Schedules a rebalance task and wait for its completion.
+            CountDownLatch terminated = scheduleRebalance(dataverseName, datasetName, targetNodes, response);
+            terminated.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, "the rebalance service is interrupted", e);
+        }
+    }
+
+    // Cancels all rebalance tasks.
+    private synchronized void cancelRebalance() throws InterruptedException {
+        for (Future rebalanceTask : rebalanceTasks) {
+            rebalanceTask.cancel(true);
+        }
+    }
+
+    // Removes a terminated task and its termination latch -- the heads.
+    private synchronized void removeTermintedTask() {
+        rebalanceTasks.remove();
+        rebalanceFutureTerminated.remove();
+    }
+
+    // Schedules a rebalance task.
+    private synchronized CountDownLatch scheduleRebalance(String dataverseName, String datasetName,
+            String[] targetNodes, IServletResponse response) {
+        CountDownLatch terminated = new CountDownLatch(1);
+        Future task = executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated));
+        rebalanceTasks.add(task);
+        rebalanceFutureTerminated.add(terminated);
+        return terminated;
+    }
+
+    // Performs the actual rebalance.
+    private void doRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response,
+            CountDownLatch terminated) {
+        try {
+            // Sets the content type.
+            HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
 
             if (datasetName == null) {
                 // Rebalances datasets in a given dataverse or all non-metadata datasets.
@@ -123,10 +183,19 @@ public class RebalanceApiServlet extends AbstractServlet {
             }
 
             // Sends response.
-            sendResponse(out, jsonResponse, response, HttpResponseStatus.OK, "successful");
+            sendResponse(response, HttpResponseStatus.OK, "successful");
+        } catch (InterruptedException e) {
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                    "the rebalance task is cancelled by a user", e);
         } catch (Exception e) {
-            sendResponse(out, jsonResponse, response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
-            LOGGER.log(Level.WARNING, e.getMessage(), e);
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString(), e);
+        } finally {
+            // Removes the heads of the task queue and the latch queue.
+            // Since the ExecutorService is one-at-a-time, the execution order of rebalance tasks is
+            // the same as the request submission order.
+            removeTermintedTask();
+            // Notify that the rebalance task is terminated.
+            terminated.countDown();
         }
     }
 
@@ -177,10 +246,24 @@ public class RebalanceApiServlet extends AbstractServlet {
     }
 
     // Sends HTTP response to the request client.
-    private void sendResponse(PrintWriter out, ObjectNode jsonResponse, IServletResponse response,
-            HttpResponseStatus status, String message) {
+    private void sendResponse(IServletResponse response, HttpResponseStatus status, String message, Exception e) {
+        if (status != HttpResponseStatus.OK) {
+            if (e != null) {
+                LOGGER.log(Level.WARNING, message, e);
+            } else {
+                LOGGER.log(Level.WARNING, message);
+            }
+        }
+        PrintWriter out = response.writer();
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode jsonResponse = om.createObjectNode();
         jsonResponse.put("results", message);
         response.setStatus(status);
         out.write(jsonResponse.toString());
     }
+
+    // Sends HTTP response to the request client.
+    private void sendResponse(IServletResponse response, HttpResponseStatus status, String message) {
+        sendResponse(response, status, message, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 691be50..275b055 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -65,6 +65,7 @@ import org.apache.asterix.transaction.management.service.recovery.TxnId;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -691,7 +692,15 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
                 // undo, upsert the old value if found, otherwise, physical delete
                 if (logRecord.getOldValue() == null) {
-                    indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+                    try {
+                        indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+                    } catch (HyracksDataException hde) {
+                        // Since we're undoing according the write-ahead log, the actual upserting tuple
+                        // might not have been written to memory yet.
+                        if (hde.getErrorCode() != ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) {
+                            throw hde;
+                        }
+                    }
                 } else {
                     indexAccessor.forceUpsert(logRecord.getOldValue());
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index dae88c2..3b17a94 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.stream.IntStream;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -61,6 +63,7 @@ import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
  * A utility class for the rebalance operation.
  */
 public class RebalanceUtil {
+    private static final Logger LOGGER = Logger.getLogger(RebalanceUtil.class.getName());
 
     private RebalanceUtil() {
 
@@ -83,12 +86,13 @@ public class RebalanceUtil {
      */
     public static void rebalance(String dataverseName, String datasetName, Set<String> targetNcNames,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception {
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Dataset sourceDataset;
         Dataset targetDataset;
+        // Executes the first Metadata transaction.
         // Generates the rebalance target files. While doing that, hold read locks on the dataset so
         // that no one can drop the rebalance source dataset.
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             // The source dataset.
             sourceDataset = metadataProvider.findDataset(dataverseName, datasetName);
@@ -125,13 +129,57 @@ public class RebalanceUtil {
             metadataProvider.getLocks().reset();
         }
 
-        // Starts another transaction for switching the metadata entity.
-        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        // Up to this point, since the bulk part of a rebalance operation is done,
+        // the following two operations will retry after interrupt and finally rethrow InterruptedException,
+        // which means that they will always succeed and could possibly throw InterruptedException as the last step.
+        // TODO(yingyi): ASTERIXDB-1948, in case a crash happens, currently the system will either:
+        // 1. (crash before metadata switch) think the rebalance is not done, and the target data files are leaked until
+        // the next rebalance request.
+        // 2. (crash after metadata switch) think the rebalance is done, and the source data files are leaked;
+        runWithRetryAfterInterrupt(() -> {
+            // Executes the 2nd Metadata transaction for switching the metadata entity.
+            // It detaches the source dataset and attaches the target dataset to metadata's point of view.
+            runMetadataTransaction(metadataProvider,
+                () -> rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc));
+            // Executes the 3rd Metadata transaction to drop the source dataset files and the node group for
+            // the source dataset.
+            runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc));
+        });
+    }
+
+    @FunctionalInterface
+    private interface Work {
+        void run() throws Exception;
+    }
+
+    // Runs works.run() and lets it sustain interrupts.
+    private static void runWithRetryAfterInterrupt(Work work) throws Exception {
+        int retryCount = 0;
+        InterruptedException interruptedException = null;
+        boolean done = false;
+        do {
+            try {
+                work.run();
+                done = true;
+            } catch (InterruptedException e) {
+                LOGGER.log(Level.WARNING, "Retry with attempt " + (++retryCount), e);
+                interruptedException = e;
+            }
+        } while (!done);
+
+        // Rethrows the interrupted exception.
+        if (interruptedException != null) {
+            throw interruptedException;
+        }
+    }
+
+    // Executes a metadata transaction.
+    private static void runMetadataTransaction(MetadataProvider metadataProvider, Work work) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            // Atomically switches the rebalance target to become the source dataset.
-            rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc);
-
+            // Performs the actual work.
+            work.run();
             // Complete the metadata transaction.
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -145,6 +193,9 @@ public class RebalanceUtil {
     // Rebalances from the source to the target.
     private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
+        // Drops the target dataset files (if any) to make rebalance idempotent.
+        dropDatasetFiles(target, metadataProvider, hcc);
+
         // Creates the rebalance target.
         createRebalanceTarget(target, metadataProvider, hcc);
 
@@ -155,6 +206,7 @@ public class RebalanceUtil {
         createAndLoadSecondaryIndexesForTarget(source, target, metadataProvider, hcc);
     }
 
+    // Switches the metadata entity from the source dataset to the target dataset.
     private static void rebalanceSwitch(Dataset source, Dataset target, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
@@ -164,6 +216,7 @@ public class RebalanceUtil {
 
         Dataset sourceDataset = MetadataManagerUtil.findDataset(mdTxnCtx, source.getDataverseName(),
                 source.getDatasetName());
+
         if (sourceDataset == null) {
             // The dataset has already been dropped.
             // In this case, we should drop the generated target dataset files.
@@ -171,18 +224,24 @@ public class RebalanceUtil {
             return;
         }
 
-        // Drops the source dataset files.
-        dropDatasetFiles(source, metadataProvider, hcc);
-
         // Updates the dataset entry in the metadata storage
         MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
+    }
+
+    // Drops the source dataset.
+    private static void dropSourceDataset(Dataset source, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc) throws Exception {
+        // Drops the source dataset files. No need to lock the dataset entity here because the source dataset has
+        // been detached at this point.
+        dropDatasetFiles(source, metadataProvider, hcc);
 
         // Drops the metadata entry of source dataset's node group.
         String sourceNodeGroup = source.getNodeGroupName();
         MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), sourceNodeGroup);
-        MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, sourceNodeGroup, true);
+        MetadataManager.INSTANCE.dropNodegroup(metadataProvider.getMetadataTxnContext(), sourceNodeGroup, true);
     }
 
+
     // Creates the files for the rebalance target dataset.
     private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
@@ -254,12 +313,13 @@ public class RebalanceUtil {
                 new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) });
     }
 
+    // Drops dataset files of a given dataset.
     private static void dropDatasetFiles(Dataset dataset, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         List<JobSpecification> jobs = new ArrayList<>();
         List<Index> indexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
         for (Index index : indexes) {
-            jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset));
+            jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, true));
         }
         for (JobSpecification jobSpec : jobs) {
             JobUtils.runJob(hcc, jobSpec, true);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 50c2986..b4f9ded 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.function.Predicate;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest;
 import org.apache.asterix.testframework.context.TestCaseContext;
@@ -104,7 +105,7 @@ public class CancellationTestExecutor extends TestExecutor {
                 return false;
             }
         }
-        String errorMsg = getErrorMessage(e);
+        String errorMsg = ExceptionUtils.getErrorMessage(e);
         // Expected, "HYR0025" means a user cancelled the query.)
         if (errorMsg.startsWith("HYR0025")) {
             SqlppExecutionWithCancellationTest.numCancelledQueries++;
@@ -115,21 +116,4 @@ public class CancellationTestExecutor extends TestExecutor {
             return true;
         }
     }
-
-    public static String getErrorMessage(Throwable th) {
-        Throwable cause = getRootCause(th);
-        return cause.getMessage();
-    }
-
-    // Finds the root cause of Throwable.
-    private static Throwable getRootCause(Throwable e) {
-        Throwable current = e;
-        Throwable cause = e.getCause();
-        while (cause != null && cause != current) {
-            Throwable nextCause = current.getCause();
-            current = cause;
-            cause = nextCause;
-        }
-        return current;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
new file mode 100644
index 0000000..a63cb76
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.common;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ComparisonEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.junit.Assert;
+
+public class RebalanceCancellationTestExecutor extends TestExecutor {
+
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+    private long waitTime = 100;
+
+    public void setWaitTime(long waitTime) {
+        this.waitTime = waitTime;
+    }
+
+    @Override
+    protected void executeHttpRequest(TestCaseContext.OutputFormat fmt, String statement,
+            Map<String, Object> variableCtx, String reqType, File testFile, File expectedResultFile,
+            File actualResultFile, MutableInt queryCount, int numResultFiles, String extension, ComparisonEnum compare)
+            throws Exception {
+        // Executes regular tests as usual.
+        if (!(testFile.getAbsolutePath().endsWith("post.http") && statement.contains("rebalance"))) {
+            super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile,
+                    actualResultFile, queryCount, numResultFiles, extension, compare);
+            return;
+        }
+
+        // Executes rebalance tests with cancellation.
+        Future<Exception> future = executor.submit(() -> {
+            //boolean failed = false;
+            try {
+                super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile,
+                        actualResultFile, queryCount, numResultFiles, extension, compare);
+            } catch (Exception e) {
+                // Since Hyracks job cancellation is not synchronous, re-executing rebalance could
+                // fail, but we keep retrying until it completes.
+                boolean done = false;
+                do {
+                    try {
+                        // Re-executes rebalance.
+                        super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile,
+                                actualResultFile, queryCount, numResultFiles, extension, compare);
+                        done = true;
+                    } catch (Exception e2) {
+                        String errorMsg = ExceptionUtils.getErrorMessage(e2);
+                        // not expected, but is a false alarm.
+                        if (errorMsg == null || !errorMsg.contains("reference count = 1")) {
+                            return e2;
+                        }
+                        LOGGER.log(Level.WARNING, e2.toString(), e2);
+                    }
+                } while (!done);
+            }
+            return null;
+        });
+        Thread.sleep(waitTime);
+        // Cancels the query request while the query is executing.
+        int rc = cancelQuery(getEndpoint(Servlets.REBALANCE), Collections.emptyList());
+        Assert.assertTrue(rc == 200 || rc == 404);
+        Exception e = future.get();
+        if (e != null) {
+            throw e;
+        }
+    }
+
+    // Cancels a submitted query through the cancellation REST API.
+    private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> params) throws Exception {
+        HttpUriRequest method = constructDeleteMethodUrl(uri, params);
+        HttpResponse response = executeHttpRequest(method);
+        return response.getStatusLine().getStatusCode();
+    }
+
+    // Constructs a HTTP DELETE request.
+    private HttpUriRequest constructDeleteMethodUrl(URI uri, List<TestCase.CompilationUnit.Parameter> otherParams) {
+        RequestBuilder builder = RequestBuilder.delete(uri);
+        for (TestCase.CompilationUnit.Parameter param : otherParams) {
+            builder.addParameter(param.getName(), param.getValue());
+        }
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index ed6a77a..8791756 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1058,9 +1058,9 @@ public class TestExecutor {
         }
     }
 
-    private void executeHttpRequest(OutputFormat fmt, String statement, Map<String, Object> variableCtx, String reqType,
-            File testFile, File expectedResultFile, File actualResultFile, MutableInt queryCount, int numResultFiles,
-            String extension, ComparisonEnum compare) throws Exception {
+    protected void executeHttpRequest(OutputFormat fmt, String statement, Map<String, Object> variableCtx,
+            String reqType, File testFile, File expectedResultFile, File actualResultFile, MutableInt queryCount,
+            int numResultFiles, String extension, ComparisonEnum compare) throws Exception {
         String handleVar = getHandleVariable(statement);
         final String trimmedPathAndQuery = stripLineComments(stripJavaComments(statement)).trim();
         final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, variableCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
new file mode 100644
index 0000000..1d7bdc3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.RebalanceCancellationTestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs rebalance tests with cancellation.
+ */
+@RunWith(Parameterized.class)
+public class RebalanceWithCancellationIT {
+    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static RebalanceCancellationTestExecutor executor = new RebalanceCancellationTestExecutor();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, executor);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "RebalanceWithCancellationIT {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("only_sqlpp.xml", "rebalance.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public RebalanceWithCancellationIT(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        // Runs each single cancellation test multiple times and tests cancellation at various points of time.
+        for (int waitTime = 100; waitTime <= 1000; waitTime += 50) {
+            executor.setWaitTime(waitTime);
+            LangExecutionUtil.test(tcCtx);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
index cce069c..fff0775 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
@@ -20,6 +20,7 @@ package org.apache.asterix.test.runtime;
 
 import java.util.Collection;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.test.common.CancellationTestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
@@ -70,7 +71,7 @@ public class SqlppExecutionWithCancellationTest {
         try {
             LangExecutionUtil.test(tcCtx);
         } catch (Exception e) {
-            String errorMsg = CancellationTestExecutor.getErrorMessage(e);
+            String errorMsg = ExceptionUtils.getErrorMessage(e);
             if (!errorMsg.contains("reference count = 1") // not expected, but is a false alarm.
                     && !errorMsg.contains("pinned and file is being closed") // not expected, but maybe a false alarm.
             ) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 51a535a..3da58e9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -35,6 +35,7 @@ import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -263,19 +264,30 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
 
     @Override
     public synchronized void close(String resourcePath) throws HyracksDataException {
-        validateDatasetLifecycleManagerState();
-        int did = getDIDfromResourcePath(resourcePath);
-        long resourceID = getResourceIDfromResourcePath(resourcePath);
-        DatasetResource dsr = datasets.get(did);
-        if (dsr == null) {
-            throw new HyracksDataException("No index found with resourceID " + resourceID);
-        }
-        IndexInfo iInfo = dsr.getIndexInfo(resourceID);
-        if (iInfo == null) {
-            throw new HyracksDataException("No index found with resourceID " + resourceID);
+        DatasetResource dsr = null;
+        IndexInfo iInfo = null;
+        try {
+            validateDatasetLifecycleManagerState();
+            int did = getDIDfromResourcePath(resourcePath);
+            long resourceID = getResourceIDfromResourcePath(resourcePath);
+            dsr = datasets.get(did);
+            if (dsr == null) {
+                throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+            }
+            iInfo = dsr.getIndexInfo(resourceID);
+            if (iInfo == null) {
+                throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+            }
+        } finally {
+            // Regardless of what exception is thrown in the try-block (e.g., line 279),
+            // we have to un-touch the index and dataset.
+            if (iInfo != null) {
+                iInfo.untouch();
+            }
+            if (dsr != null) {
+                dsr.untouch();
+            }
         }
-        iInfo.untouch();
-        dsr.untouch();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index 649f1f5..3105b3f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -20,7 +20,6 @@ package org.apache.asterix.common.exceptions;
 
 public class ExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
-    public static final String MISSING_PARAMETER = "Missing parameter.\n";
     public static final String PARAMETER_NAME = "Parameter name: ";
     public static final String EXPECTED_VALUE = "Expected value: ";
     public static final String PASSED_VALUE = "Passed value: ";
@@ -32,4 +31,22 @@ public class ExceptionUtils {
         return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE
                 + expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue;
     }
+
+    // Gets the error message for the root cause of a given Throwable instance.
+    public static String getErrorMessage(Throwable th) {
+        Throwable cause = getRootCause(th);
+        return cause.getMessage();
+    }
+
+    // Finds the root cause of a given Throwable instance.
+    public static Throwable getRootCause(Throwable e) {
+        Throwable current = e;
+        Throwable cause = e.getCause();
+        while (cause != null && cause != current) {
+            Throwable nextCause = current.getCause();
+            current = cause;
+            cause = nextCause;
+        }
+        return current;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 9e55a97..411f866 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -103,7 +103,14 @@ public class IndexUtil {
             Dataset dataset) throws AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
-        return secondaryIndexHelper.buildDropJobSpec();
+        return secondaryIndexHelper.buildDropJobSpec(false);
+    }
+
+    public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider,
+            Dataset dataset, boolean failSilently) throws AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+                .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
+        return secondaryIndexHelper.buildDropJobSpec(failSilently);
     }
 
     public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index d11ba21..8c45c11 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -170,7 +170,7 @@ public abstract class SecondaryIndexOperationsHelper {
 
     public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException;
 
-    public abstract JobSpecification buildDropJobSpec() throws AlgebricksException;
+    public abstract JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException;
 
     protected void init() throws AlgebricksException {
         payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 907192c..2dcab4f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -66,14 +66,15 @@ public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexO
     }
 
     @Override
-    public JobSpecification buildDropJobSpec() throws AlgebricksException {
+    public JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                 metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
         // The index drop operation should be persistent regardless of temp datasets or permanent dataset.
-        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory);
+        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
+                failSilently);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
                 splitsAndConstraint.second);
         spec.addRoot(btreeDrop);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cb03ae4..e530bc3 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -213,13 +213,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     public synchronized void delete(String relativePath) throws HyracksDataException {
         FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
         if (resourceFile.getFile().exists()) {
-            resourceFile.delete();
-            resourceCache.invalidate(relativePath);
-
-            //if replication enabled, delete resource from remote replicas
-            if (isReplicationEnabled
-                    && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
-                createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+            try {
+                // Invalidate before deleting the file just in case file deletion throws some exception.
+                // Since it's just a cache invalidation, it should not affect correctness.
+                resourceCache.invalidate(relativePath);
+                resourceFile.delete();
+            } finally {
+                // Regardless of successfully deleted or not, the operation should be replicated.
+                //if replication enabled, delete resource from remote replicas
+                if (isReplicationEnabled
+                        && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
+                    createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+                }
             }
         } else {
             throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8a91547..2e6c8a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -116,6 +116,7 @@ public class ErrorCode {
     public static final int CANNOT_CREATE_EXISTING_INDEX = 80;
     public static final int FILE_ALREADY_MAPPED = 81;
     public static final int FILE_ALREADY_EXISTS = 82;
+    public static final int NO_INDEX_FOUND_WITH_RESOURCE_ID = 83;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 9f983a7..6389ffa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -99,5 +99,6 @@
 80 = Cannot create index because it already exists
 81 = File %1$s is already mapped
 82 = Failed to create the file %1$s because it already exists
+83 = No index found with resourceID %1$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index d162be0..18c7107 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -30,16 +30,23 @@ public class IndexDropOperatorDescriptor extends AbstractSingleActivityOperatorD
 
     private static final long serialVersionUID = 1L;
     private final IIndexDataflowHelperFactory dataflowHelperFactory;
+    private final boolean failSilently;
 
     public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
             IIndexDataflowHelperFactory dataflowHelperFactory) {
+        this(spec, dataflowHelperFactory, false);
+    }
+
+    public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IIndexDataflowHelperFactory dataflowHelperFactory, boolean failSilently) {
         super(spec, 0, 0);
         this.dataflowHelperFactory = dataflowHelperFactory;
+        this.failSilently = failSilently;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new IndexDropOperatorNodePushable(dataflowHelperFactory, ctx, partition);
+        return new IndexDropOperatorNodePushable(dataflowHelperFactory, failSilently, ctx, partition);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a32a852b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index fce31ca..7c2021b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -28,10 +28,12 @@ import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 
 public class IndexDropOperatorNodePushable extends AbstractOperatorNodePushable {
     private final IIndexDataflowHelper indexHelper;
+    private final boolean failSliently;
 
-    public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx,
-            int partition) throws HyracksDataException {
+    public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, boolean failSilently,
+            IHyracksTaskContext ctx, int partition) throws HyracksDataException {
         this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        this.failSliently = failSilently;
     }
 
     @Override
@@ -50,7 +52,13 @@ public class IndexDropOperatorNodePushable extends AbstractOperatorNodePushable
 
     @Override
     public void initialize() throws HyracksDataException {
-        indexHelper.destroy();
+        try {
+            indexHelper.destroy();
+        } catch (HyracksDataException e) {
+            if (!failSliently) {
+                throw e;
+            }
+        }
     }
 
     @Override


Mime
View raw message