asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [5/9] asterixdb git commit: [ASTERIXDB-1992][ING] Suspend/Resume active entities
Date Thu, 27 Jul 2017 06:35:59 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
new file mode 100644
index 0000000..52d4225
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.active;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.mockito.Mockito;
+
+public class TestClusterControllerActor extends Actor {
+
+    private final ActiveNotificationHandler handler;
+    private final List<Dataset> allDatasets;
+
+    public TestClusterControllerActor(String name, ActiveNotificationHandler handler, List<Dataset> allDatasets) {
+        super(name, null);
+        this.handler = handler;
+        this.allDatasets = allDatasets;
+    }
+
+    public Action startActiveJob(JobId jobId, EntityId entityId) {
+        Action startJob = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
+                // succeed
+                JobSpecification jobSpecification = Mockito.mock(JobSpecification.class);
+                Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
+                        .thenReturn(entityId);
+                handler.notifyJobCreation(jobId, jobSpecification);
+                handler.notifyJobStart(jobId);
+            }
+        };
+        add(startJob);
+        return startJob;
+    }
+
+    public Action activeEvent(ActiveEvent event) {
+        Action delivery = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
+                handler.add(event);
+            }
+        };
+        add(delivery);
+        return delivery;
+    }
+
+    public Action jobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+        Action delivery = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
+                handler.notifyJobFinish(jobId, jobStatus, exceptions);
+            }
+        };
+        add(delivery);
+        return delivery;
+    }
+
+    public List<Dataset> getAllDatasets() {
+        return allDatasets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
new file mode 100644
index 0000000..d896995
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.active;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.app.active.ActiveEntityEventsListener;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.LockList;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobStatus;
+
+public class TestEventsListener extends ActiveEntityEventsListener {
+
+    public static enum Behavior {
+        SUCCEED,
+        RUNNING_JOB_FAIL,
+        FAIL_COMPILE,
+        FAIL_RUNTIME,
+        STEP_SUCCEED,
+        STEP_FAIL_COMPILE,
+        STEP_FAIL_RUNTIME
+    }
+
+    private final Semaphore step = new Semaphore(0);
+    private final TestClusterControllerActor clusterController;
+    private final TestNodeControllerActor[] nodeControllers;
+    private final JobIdFactory jobIdFactory;
+    private Behavior onStart = Behavior.FAIL_COMPILE;
+    private Behavior onStop = Behavior.FAIL_COMPILE;
+
+    public TestEventsListener(TestClusterControllerActor clusterController, TestNodeControllerActor[] nodeControllers,
+            JobIdFactory jobIdFactory, EntityId entityId, List<Dataset> datasets, IStatementExecutor statementExecutor,
+            ICcApplicationContext appCtx, IHyracksClientConnection hcc, AlgebricksAbsolutePartitionConstraint locations,
+            IRetryPolicyFactory retryPolicyFactory) throws HyracksDataException {
+        super(statementExecutor, appCtx, hcc, entityId, datasets, locations, TestEventsListener.class.getSimpleName(),
+                retryPolicyFactory);
+        this.clusterController = clusterController;
+        this.nodeControllers = nodeControllers;
+        this.jobIdFactory = jobIdFactory;
+    }
+
+    public void allowStep() {
+        step.release();
+    }
+
+    private void step(Behavior behavior) throws HyracksDataException {
+        if (behavior == Behavior.STEP_FAIL_COMPILE || behavior == Behavior.STEP_FAIL_RUNTIME
+                || behavior == Behavior.STEP_SUCCEED) {
+            takeStep();
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void failCompile(Behavior behavior) throws HyracksDataException {
+        if (behavior == Behavior.FAIL_COMPILE || behavior == Behavior.STEP_FAIL_COMPILE) {
+            throw new HyracksDataException("Compilation Failure");
+        }
+    }
+
+    private synchronized void takeStep() throws HyracksDataException {
+        try {
+            while (!step.tryAcquire()) {
+                notifyAll();
+                wait(10);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException {
+        step(onStart);
+        failCompile(onStart);
+        JobId jobId = jobIdFactory.create();
+        Action startJob = clusterController.startActiveJob(jobId, entityId);
+        try {
+            startJob.sync();
+        } catch (InterruptedException e) {
+            throw HyracksDataException.create(e);
+        }
+        WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
+                EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED));
+        if (onStart == Behavior.FAIL_RUNTIME || onStart == Behavior.STEP_FAIL_RUNTIME) {
+            clusterController.jobFinish(jobId, JobStatus.FAILURE,
+                    Collections.singletonList(new HyracksDataException("RuntimeFailure")));
+        } else {
+            for (int i = 0; i < nodeControllers.length; i++) {
+                TestNodeControllerActor nodeController = nodeControllers[0];
+                nodeController.registerRuntime(jobId, entityId, i);
+            }
+        }
+        try {
+            subscriber.sync();
+            if (subscriber.getFailure() != null) {
+                throw HyracksDataException.create(subscriber.getFailure());
+            }
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+        step(onStop);
+        failCompile(onStop);
+        try {
+            Set<ActivityState> waitFor;
+            if (state == ActivityState.STOPPING) {
+                waitFor = EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED);
+            } else if (state == ActivityState.SUSPENDING) {
+                waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
+            } else {
+                throw new IllegalStateException("stop with what intention??");
+            }
+            WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor);
+            if (onStop == Behavior.RUNNING_JOB_FAIL) {
+                clusterController.jobFinish(jobId, JobStatus.FAILURE,
+                        Collections.singletonList(new HyracksDataException("RuntimeFailure")));
+            } else {
+                for (int i = 0; i < nodeControllers.length; i++) {
+                    TestNodeControllerActor nodeController = nodeControllers[0];
+                    nodeController.deRegisterRuntime(jobId, entityId, i).sync();
+                }
+                clusterController.jobFinish(jobId, JobStatus.TERMINATED, Collections.emptyList());
+            }
+            subscriber.sync();
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        return null;
+    }
+
+    public void onStart(Behavior behavior) {
+        this.onStart = behavior;
+    }
+
+    public void onStop(Behavior behavior) {
+        this.onStop = behavior;
+    }
+
+    @Override
+    protected void setRunning(MetadataProvider metadataProvider, boolean running)
+            throws HyracksDataException, AlgebricksException {
+        try {
+            IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+            LockList locks = metadataProvider.getLocks();
+            lockManager.acquireDataverseReadLock(locks, entityId.getDataverse());
+            lockManager.acquireActiveEntityWriteLock(locks, entityId.getDataverse() + '.' + entityId.getEntityName());
+            // persist entity
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    protected Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException {
+        return doStop(metadataProvider);
+    }
+
+    @Override
+    protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException {
+        doStart(metadataProvider);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
new file mode 100644
index 0000000..e7e21b6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.active;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.api.job.JobId;
+
+public class TestNodeControllerActor extends Actor {
+
+    private final String id;
+    private final TestClusterControllerActor clusterController;
+
+    public TestNodeControllerActor(String name, TestClusterControllerActor clusterController) {
+        super("NC: " + name, null);
+        this.id = name;
+        this.clusterController = clusterController;
+    }
+
+    public Action registerRuntime(JobId jobId, EntityId entityId, int partition) {
+        Action registration = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
+                ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId,
+                        new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId,
+                                ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null));
+                clusterController.activeEvent(event);
+            }
+        };
+        add(registration);
+        return registration;
+    }
+
+    public Action deRegisterRuntime(JobId jobId, EntityId entityId, int partition) {
+        Action registration = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
+                ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId,
+                        new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId,
+                                ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null));
+                clusterController.activeEvent(event);
+            }
+        };
+        add(registration);
+        return registration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
new file mode 100644
index 0000000..5f715af
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.active;
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.metadata.api.IActiveEntityController;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestUserActor extends Actor {
+
+    private TestClusterControllerActor clusterController;
+    private IMetadataLockManager lockManager;
+
+    public TestUserActor(String name, MetadataProvider metadataProvider, TestClusterControllerActor clusterController) {
+        super(name, metadataProvider);
+        this.clusterController = clusterController;
+        this.lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+    }
+
+    public Action startActivity(IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String dataverseName = actionListener.getEntityId().getDataverse();
+                String entityName = actionListener.getEntityId().getEntityName();
+                try {
+                    lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
+                    List<Dataset> datasets = actionListener.getDatasets();
+                    for (Dataset dataset : datasets) {
+                        MetadataLockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataverseName,
+                                DatasetUtil.getFullyQualifiedName(dataset));
+                    }
+                    actionListener.start(mdProvider);
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action stopActivity(IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String dataverseName = actionListener.getEntityId().getDataverse();
+                String entityName = actionListener.getEntityId().getEntityName();
+                try {
+                    lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
+                    List<Dataset> datasets = actionListener.getDatasets();
+                    for (Dataset dataset : datasets) {
+                        MetadataLockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataverseName,
+                                DatasetUtil.getFullyQualifiedName(dataset));
+                    }
+                    actionListener.stop(mdProvider);
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action suspendActivity(IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String dataverseName = actionListener.getEntityId().getDataverse();
+                String entityName = actionListener.getEntityId().getEntityName();
+                List<Dataset> datasets = actionListener.getDatasets();
+                try {
+                    lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
+                    for (Dataset dataset : datasets) {
+                        lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
+                                DatasetUtil.getFullyQualifiedName(dataset));
+                    }
+                    actionListener.suspend(mdProvider);
+                } catch (Exception e) {
+                    // only release in case of failure
+                    mdProvider.getLocks().reset();
+                    throw e;
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action resumeActivity(IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String dataverseName = actionListener.getEntityId().getDataverse();
+                String entityName = actionListener.getEntityId().getEntityName();
+                try {
+                    lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
+                    List<Dataset> datasets = actionListener.getDatasets();
+                    for (Dataset dataset : datasets) {
+                        lockManager.upgradeDatasetLockToWrite(mdProvider.getLocks(),
+                                DatasetUtil.getFullyQualifiedName(dataset));
+                        lockManager.downgradeDatasetLockToExclusiveModify(mdProvider.getLocks(),
+                                DatasetUtil.getFullyQualifiedName(dataset));
+                    }
+                    actionListener.resume(mdProvider);
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action addDataset(Dataset dataset, IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String entityDataverseName = actionListener.getEntityId().getDataverse();
+                String entityName = actionListener.getEntityId().getEntityName();
+                try {
+                    lockManager.acquireActiveEntityReadLock(mdProvider.getLocks(),
+                            entityDataverseName + '.' + entityName);
+                    lockManager.acquireDatasetWriteLock(mdProvider.getLocks(),
+                            DatasetUtil.getFullyQualifiedName(dataset));
+                    List<Dataset> datasets = clusterController.getAllDatasets();
+                    if (datasets.contains(dataset)) {
+                        throw new HyracksDataException("Dataset " + dataset + " already exists");
+                    }
+                    actionListener.add(dataset);
+                    datasets.add(dataset);
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action dropDataset(Dataset dataset, IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String entityDataverseName = actionListener.getEntityId().getDataverse();
+                String entityName = actionListener.getEntityId().getEntityName();
+                try {
+                    lockManager.acquireActiveEntityReadLock(mdProvider.getLocks(),
+                            entityDataverseName + '.' + entityName); // we have to first read lock all active entities before deleting a dataset
+                    lockManager.acquireDatasetWriteLock(mdProvider.getLocks(),
+                            DatasetUtil.getFullyQualifiedName(dataset));
+                    List<Dataset> datasets = clusterController.getAllDatasets();
+                    if (!datasets.contains(dataset)) {
+                        throw new HyracksDataException("Dataset " + dataset + " does not exist");
+                    }
+                    actionListener.remove(dataset);
+                    datasets.remove(dataset);
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action addIndex(Dataset dataset, IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String dataverseName = dataset.getDataverseName();
+                String datasetFullyQualifiedName = dataverseName + '.' + dataset.getDatasetName();
+                String indexFullyQualifiedName = datasetFullyQualifiedName + ".index";
+                try {
+                    MetadataLockUtil.createIndexBegin(lockManager, mdProvider.getLocks(), dataverseName,
+                            datasetFullyQualifiedName);
+                    if (actionListener.isActive()) {
+                        throw new RuntimeDataException(ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY,
+                                indexFullyQualifiedName, actionListener.getEntityId(), actionListener.getState());
+                    }
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action dropIndex(Dataset dataset, IActiveEntityController actionListener) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String dataverseName = dataset.getDataverseName();
+                String datasetFullyQualifiedName = dataverseName + '.' + dataset.getDatasetName();
+                try {
+                    MetadataLockUtil.dropIndexBegin(lockManager, mdProvider.getLocks(), dataverseName,
+                            datasetFullyQualifiedName);
+                    if (actionListener.isActive()) {
+                        throw new RuntimeDataException(
+                                ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY,
+                                datasetFullyQualifiedName + ".index", actionListener.getEntityId(),
+                                actionListener.getState());
+                    }
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action query(Dataset dataset, Semaphore semaphore) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                String dataverseName = dataset.getDataverseName();
+                String datasetFullyQualifiedName = dataverseName + '.' + dataset.getDatasetName();
+                try {
+                    lockManager.acquireDataverseReadLock(mdProvider.getLocks(), dataverseName);
+                    lockManager.acquireDatasetReadLock(mdProvider.getLocks(), datasetFullyQualifiedName);
+                    if (!semaphore.tryAcquire()) {
+                        semaphore.acquire();
+                    }
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action suspendAllActivities(ActiveNotificationHandler handler) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                handler.suspend(mdProvider);
+            }
+        };
+        add(action);
+        return action;
+    }
+
+    public Action resumeAllActivities(ActiveNotificationHandler handler) {
+        Action action = new Action() {
+            @Override
+            protected void doExecute(MetadataProvider mdProvider) throws Exception {
+                try {
+                    handler.resume(mdProvider);
+                } finally {
+                    mdProvider.getLocks().reset();
+                }
+            }
+        };
+        add(action);
+        return action;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 8442162..10b528f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -472,8 +472,10 @@ public class TestExecutor {
             } catch (Exception e) {
                 // whoops, not JSON (e.g. 404) - just include the body
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errorBody);
-                throw new Exception("HTTP operation failed:" + "\nSTATUS LINE: " + httpResponse.getStatusLine()
-                        + "\nERROR_BODY: " + errorBody, e);
+                Exception failure = new Exception("HTTP operation failed:" + "\nSTATUS LINE: "
+                        + httpResponse.getStatusLine() + "\nERROR_BODY: " + errorBody);
+                failure.addSuppressed(e);
+                throw failure;
             }
             throw new ParsedException("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: "
                     + httpResponse.getStatusLine() + "\nSUMMARY: " + errors[2].split("\n")[0], errors[2]);
@@ -1173,7 +1175,8 @@ public class TestExecutor {
                 break;
             } catch (TimeoutException e) {
                 if (responsesReceived == 0) {
-                    throw new Exception("Poll limit (" + timeoutSecs + "s) exceeded without obtaining *any* result from server");
+                    throw new Exception(
+                            "Poll limit (" + timeoutSecs + "s) exceeded without obtaining *any* result from server");
                 } else {
                     throw new Exception("Poll limit (" + timeoutSecs + "s) exceeded without obtaining expected result");
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index a5e6fbb..e99686d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -237,13 +237,13 @@
     <test-case FilePath="feeds">
       <compilation-unit name="start-started-feed">
         <output-dir compare="Text">start-started-feed</output-dir>
-        <expected-error>Feed TweetFeed is started already.</expected-error>
+        <expected-error>experiments.TweetFeed(Feed) is already started</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
       <compilation-unit name="stop-stopped-feed">
         <output-dir compare="Text">stop-stopped-feed</output-dir>
-        <expected-error>Feed TweetFeed is not started.</expected-error>
+        <expected-error>new_experiments.TweetFeed(Feed) cannot be stopped because its state is STOPPED</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
@@ -6421,7 +6421,7 @@
     <test-case FilePath="cross-dataverse">
       <compilation-unit name="drop-dataverse">
         <output-dir compare="Text">drop-dataverse</output-dir>
-        <expected-error>org.apache.asterix.metadata.MetadataException: Cannot drop dataverse. Type a.a used by dataset b.b1</expected-error>
+        <expected-error>Cannot drop dataverse. Type a.a used by dataset b.b1</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="cross-dataverse">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
index c3cf86b..61ded27 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
@@ -55,7 +55,7 @@ public interface IClusterEventsSubscriber {
      * @param previousState
      * @param newState
      */
-    default void notifyStateChange(ClusterState previousState, ClusterState newState) {
+    default void notifyStateChange(ClusterState newState) {
         // default is no-op
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java
new file mode 100644
index 0000000..4408d84
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.metadata.LockList;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface IMetadataLockManager {
+
+    /**
+     * Acquire read lock on the dataverse
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param dataverseName
+     *            the dataverse name
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDataverseReadLock(LockList locks, String dataverseName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the dataverse
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param dataverseName
+     *            the dataverse name
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDataverseWriteLock(LockList locks, String dataverseName) throws AsterixException;
+
+    /**
+     * Acquire read lock on the dataset (for queries)
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datasetFullyQualifiedName
+     *            the fully qualified name of the dataset
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDatasetReadLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the dataset (for dataset create, dataset drop, and index drop)
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datasetFullyQualifiedName
+     *            the fully qualified name of the dataset
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDatasetWriteLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire modify lock on the dataset (for inserts, upserts, deletes) Mutually exclusive with create index lock
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datasetFullyQualifiedName
+     *            the fully qualified name of the dataset
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDatasetModifyLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire create index lock on the dataset (for index creation) Mutually exclusive with modify lock
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datasetFullyQualifiedName
+     *            the fully qualified name of the dataset
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDatasetCreateIndexLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire exclusive modify lock on the dataset. only a single thread can acquire this lock and it is mutually
+     * exclusive with modify locks and index build lock
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datasetFullyQualifiedName
+     *            the fully qualified name of the dataset
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDatasetExclusiveModificationLock(LockList locks, String datasetFullyQualifiedName)
+            throws AsterixException;
+
+    /**
+     * Acquire read lock on the function
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param functionFullyQualifiedName
+     *            the fully qualified name of the function
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireFunctionReadLock(LockList locks, String functionFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the function
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param functionFullyQualifiedName
+     *            the fully qualified name of the function
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireFunctionWriteLock(LockList locks, String functionFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire read lock on the node group
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param nodeGroupName
+     *            the name of the node group
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireNodeGroupReadLock(LockList locks, String nodeGroupName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the node group
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param nodeGroupName
+     *            the name of the node group
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireNodeGroupWriteLock(LockList locks, String nodeGroupName) throws AsterixException;
+
+    /**
+     * Acquire read lock on the active entity
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param entityFullyQualifiedName
+     *            the fully qualified name of the active entity
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireActiveEntityReadLock(LockList locks, String entityFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the active entity
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param entityFullyQualifiedName
+     *            the fully qualified name of the active entity
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireActiveEntityWriteLock(LockList locks, String entityFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire read lock on the feed policy
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param feedPolicyFullyQualifiedName
+     *            the fully qualified name of the feed policy
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireFeedPolicyWriteLock(LockList locks, String feedPolicyFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the feed policy
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param feedPolicyFullyQualifiedName
+     *            the fully qualified name of the feed policy
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireFeedPolicyReadLock(LockList locks, String feedPolicyFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire read lock on the merge policy
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param mergePolicyFullyQualifiedName
+     *            the fully qualified name of the merge policy
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireMergePolicyReadLock(LockList locks, String mergePolicyFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the merge policy
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param mergePolicyFullyQualifiedName
+     *            the fully qualified name of the merge policy
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireMergePolicyWriteLock(LockList locks, String mergePolicyFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire read lock on the data type
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datatypeFullyQualifiedName
+     *            the fully qualified name of the data type
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDataTypeReadLock(LockList locks, String datatypeFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire write lock on the data type
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datatypeFullyQualifiedName
+     *            the fully qualified name of the data type
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireDataTypeWriteLock(LockList locks, String datatypeFullyQualifiedName) throws AsterixException;
+
+    /**
+     * Acquire read lock on the extension entity
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param extension
+     *            the extension key
+     * @param extensionEntityFullyQualifiedName
+     *            the fully qualified name of the extension entity
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireExtensionReadLock(LockList locks, String extension, String extensionEntityFullyQualifiedName)
+            throws AsterixException;
+
+    /**
+     * Acquire write lock on the extension entity
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param extension
+     *            the extension key
+     * @param extensionEntityFullyQualifiedName
+     *            the fully qualified name of the extension entity
+     * @throws AsterixException
+     *             if lock couldn't be acquired
+     */
+    void acquireExtensionWriteLock(LockList locks, String extension, String extensionEntityFullyQualifiedName)
+            throws AsterixException;
+
+    /**
+     * Upgrade a previously acquired exclusive modification lock on the dataset to a write lock
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datasetFullyQualifiedName
+     *            the fully qualified name of the dataset
+     * @throws AlgebricksException
+     *             if lock couldn't be upgraded
+     */
+    void upgradeDatasetLockToWrite(LockList locks, String datasetFullyQualifiedName) throws AlgebricksException;
+
+    /**
+     * Downgrade an upgraded dataset write lock to an exclusive modification lock
+     *
+     * @param locks
+     *            the lock list to add the new lock to
+     * @param datasetFullyQualifiedName
+     *            the fully qualified name of the dataset
+     * @throws AlgebricksException
+     *             if lock couldn't be downgraded
+     */
+    void downgradeDatasetLockToExclusiveModify(LockList locks, String datasetFullyQualifiedName)
+            throws AlgebricksException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java
index 6211af4..b4559c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java
@@ -20,11 +20,22 @@ package org.apache.asterix.common.cluster;
 
 import org.apache.asterix.common.api.IClusterEventsSubscriber;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IGlobalRecoveryManager extends IClusterEventsSubscriber {
 
     /**
-     * Starts the global recovery process if the cluster state changed to ACTIVE.
+     * Starts the global recovery process after the cluster state has changed to ACTIVE.
+     *
+     * @param appCtx
+     *            the application context
+     * @throws HyracksDataException
+     *             if the global recovery fails
      */
-    public void startGlobalRecovery(ICcApplicationContext appCtx);
+    void startGlobalRecovery(ICcApplicationContext appCtx) throws HyracksDataException;
+
+    /**
+     * @return true, if global recovery has been completed successfully
+     */
+    boolean isRecoveryCompleted();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 249bd56..3eff214 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.common.dataflow;
 
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -63,9 +65,9 @@ public interface ICcApplicationContext extends IApplicationContext {
     IFaultToleranceStrategy getFaultToleranceStrategy();
 
     /**
-     * @return the active lifecycle listener at Cluster controller
+     * @return the active notification handler at Cluster controller
      */
-    IJobLifecycleListener getActiveLifecycleListener();
+    IJobLifecycleListener getActiveNotificationHandler();
 
     /**
      * @return a new instance of {@link IHyracksClientConnection}
@@ -90,4 +92,14 @@ public interface ICcApplicationContext extends IApplicationContext {
      * @return the extension manager instance
      */
     Object getExtensionManager();
+
+    /**
+     * @return the metadata lock manager
+     */
+    IMetadataLockManager getMetadataLockManager();
+
+    /**
+     * @return the cluster state manager
+     */
+    IClusterStateManager getClusterStateManager();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java
index 7cb7d8a..dc884f1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java
@@ -25,6 +25,10 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 public class AsterixException extends AlgebricksException {
     private static final long serialVersionUID = 1L;
 
+    /**
+     * @deprecated Instead, use a constructor with error code
+     */
+    @Deprecated
     public AsterixException(String message) {
         super(message);
     }
@@ -34,10 +38,11 @@ public class AsterixException extends AlgebricksException {
 
     }
 
-    public static AsterixException create(int errorCode, Serializable... params) {
-        return new AsterixException(errorCode, params);
-    }
-
+    /**
+     * @deprecated When creating a constructor with cause,
+     *             create AlgebricksException using AlgebricksException.create(Throwable th);
+     */
+    @Deprecated
     public AsterixException(Throwable cause) {
         super(cause);
     }
@@ -47,7 +52,15 @@ public class AsterixException extends AlgebricksException {
         addSuppressed(cause);
     }
 
+    /**
+     * @deprecated Instead, use a constructor with error code
+     */
+    @Deprecated
     public AsterixException(String message, Throwable cause) {
         super(message, cause);
     }
+
+    public static AsterixException create(int errorCode, Serializable... params) {
+        return new AsterixException(errorCode, params);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java
index b53eec4..693b1c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java
@@ -31,8 +31,7 @@ public class CompilationException extends AlgebricksException {
     }
 
     public CompilationException(int errorCode, Throwable cause, Serializable... params) {
-        super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), params);
-        addSuppressed(cause);
+        super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), cause, params);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9d889ea..a00ed99 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -67,6 +67,7 @@ public class ErrorCode {
     public static final int POLYGON_INVALID_COORDINATE = 24;
     public static final int POLYGON_3_POINTS = 25;
     public static final int POLYGON_INVALID = 26;
+    public static final int OPERATION_NOT_SUPPORTED = 27;
 
     public static final int INSTANTIATION_ERROR = 100;
 
@@ -114,6 +115,12 @@ public class ErrorCode {
     public static final int INDEX_ILLEGAL_ENFORCED_NON_OPTIONAL = 1041;
     public static final int INDEX_ILLEGAL_NON_ENFORCED_TYPED = 1042;
     public static final int INDEX_RTREE_MULTIPLE_FIELDS_NOT_ALLOWED = 1043;
+    public static final int REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE = 1044;
+    public static final int ILLEGAL_LOCK_UPGRADE_OPERATION = 1045;
+    public static final int ILLEGAL_LOCK_DOWNGRADE_OPERATION = 1046;
+    public static final int UPGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED = 1047;
+    public static final int DOWNGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED = 1048;
+    public static final int LOCK_WAS_ACQUIRED_DIFFERENT_OPERATION = 1049;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;
@@ -200,6 +207,26 @@ public class ErrorCode {
     public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086;
     public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087;
     public static final int ACTIVE_MANAGER_INVALID_RUNTIME = 3088;
+    public static final int ACTIVE_ENTITY_ALREADY_STARTED = 3089;
+    public static final int ACTIVE_ENTITY_CANNOT_BE_STOPPED = 3090;
+    public static final int CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY = 3091;
+    public static final int CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY = 3092;
+    public static final int ACTIVE_ENTITY_IS_ALREADY_REGISTERED = 3093;
+    public static final int CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY = 3094;
+    public static final int CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY = 3095;
+    public static final int ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED = 3096;
+    public static final int ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED = 3097;
+    public static final int CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER = 3098;
+    public static final int DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER = 3099;
+    public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 3100;
+    public static final int DOUBLE_RECOVERY_ATTEMPTS = 3101;
+    public static final int UNREPORTED_TASK_FAILURE_EXCEPTION = 3102;
+    public static final int ACTIVE_ENTITY_ALREADY_SUSPENDED = 3103;
+    public static final int ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE = 3104;
+    public static final int ACTIVE_RUNTIME_IS_ALREADY_REGISTERED = 3105;
+    public static final int ACTIVE_RUNTIME_IS_NOT_REGISTERED = 3106;
+    public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
+    public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java
new file mode 100644
index 0000000..f04d19d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.exceptions;
+
+import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MetadataException extends CompilationException {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(MetadataException.class.getName());
+
+    @Deprecated
+    /**
+     * @Deprecated Instead, use a constructor with error code
+     * @param message
+     */
+    public MetadataException(String message) {
+        super(message);
+    }
+
+    @Deprecated
+    /**
+     * @Deprecated When creating a constructor with cause,
+     *             create AlgebricksException using AlgebricksException.create(Throwable th);
+     * @param cause
+     */
+    public MetadataException(Throwable cause) {
+        super(cause);
+    }
+
+    @Deprecated
+    /**
+     * @Deprecated When creating a constructor with cause,
+     *             create AlgebricksException using AlgebricksException.create(Throwable th);
+     * @param cause
+     */
+    public MetadataException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MetadataException(int errorCode, Serializable... params) {
+        super(errorCode, params);
+    }
+
+    public MetadataException(int errorCode, Throwable cause, Serializable... params) {
+        super(errorCode, cause, params);
+    }
+
+    public static MetadataException create(Throwable cause) {
+        if (cause instanceof MetadataException || cause == null) {
+            return (MetadataException) cause;
+        }
+        if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
+            LOGGER.log(Level.WARNING, "Wrapping an InterruptedException in " + MetadataException.class.getSimpleName()
+                    + " and current thread is not interrupted", cause);
+        }
+        return new MetadataException(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
index 0c099fb..85bfaa5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
@@ -31,7 +31,6 @@ public class RuntimeDataException extends HyracksDataException {
     }
 
     public RuntimeDataException(int errorCode, Throwable cause, Serializable... params) {
-        super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), params);
-        addSuppressed(cause);
+        super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), cause, params);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
index a0e6e71..276e294 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
@@ -23,6 +23,16 @@ import java.util.List;
 public interface IDataset {
 
     /**
+     * @return the dataverse name
+     */
+    String getDataverseName();
+
+    /**
+     * @return the dataset name
+     */
+    String getDatasetName();
+
+    /**
      * @return the list of primary keys for the dataset
      */
     List<List<String>> getPrimaryKeys();
@@ -31,5 +41,4 @@ public interface IDataset {
      * @return the bloom filter fields indexes for the primary index of the dataset
      */
     int[] getPrimaryBloomFilterFields();
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
new file mode 100644
index 0000000..ba17b0c
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.metadata;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.MetadataException;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+/**
+ * A Metadata lock local to compilation node
+ */
+public interface IMetadataLock {
+
+    enum Mode {
+        READ,
+        MODIFY,
+        INDEX_BUILD,
+        EXCLUSIVE_MODIFY,
+        UPGRADED_WRITE,
+        WRITE;
+
+        public boolean contains(Mode mode) {
+            if (mode == this) {
+                return true;
+            }
+            if (this == Mode.WRITE) {
+                return true;
+            }
+            return mode == Mode.READ;
+        }
+    }
+
+    /**
+     * Acquire a lock
+     *
+     * @param mode
+     *            lock mode
+     */
+    void lock(IMetadataLock.Mode mode);
+
+    /**
+     * Release a lock
+     *
+     * @param mode
+     *            lock mode
+     */
+    void unlock(IMetadataLock.Mode mode);
+
+    /**
+     * Get the lock's key
+     *
+     * @return the key identiying the lock
+     */
+    String getKey();
+
+    /**
+     * upgrade the lock
+     *
+     * @param from
+     * @param to
+     * @throws AlgebricksException
+     */
+    default void upgrade(Mode from, Mode to) throws AlgebricksException {
+        throw new MetadataException(ErrorCode.ILLEGAL_LOCK_UPGRADE_OPERATION, from, to);
+    }
+
+    /**
+     * downgrade the lock
+     *
+     * @param from
+     * @param to
+     * @throws AlgebricksException
+     */
+    default void downgrade(Mode from, Mode to) throws AlgebricksException {
+        throw new MetadataException(ErrorCode.ILLEGAL_LOCK_DOWNGRADE_OPERATION, from, to);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
new file mode 100644
index 0000000..6f2bc39
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.metadata;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.metadata.IMetadataLock.Mode;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+/**
+ * The LockList is used for two phase locking.
+ */
+public class LockList {
+    private final List<MutablePair<IMetadataLock, IMetadataLock.Mode>> locks = new ArrayList<>();
+    private final HashMap<String, Integer> indexes = new HashMap<>();
+    private boolean lockPhase = true;
+
+    /**
+     * Acquires a lock.
+     *
+     * @param mode
+     *            the lock mode.
+     * @param lock
+     *            the lock object.
+     */
+    public void add(IMetadataLock.Mode mode, IMetadataLock lock) throws AsterixException {
+        if (isContained(mode, lock)) {
+            return;
+        }
+        lock.lock(mode);
+        indexes.put(lock.getKey(), locks.size());
+        locks.add(MutablePair.of(lock, mode));
+    }
+
+    private boolean isContained(Mode mode, IMetadataLock lock) throws AsterixException {
+        if (!lockPhase) {
+            throw new AsterixException(ErrorCode.COMPILATION_TWO_PHASE_LOCKING_VIOLATION);
+        }
+        Integer index = indexes.get(lock.getKey());
+        if (index != null) {
+            Mode acquired = locks.get(index).right;
+            if (!acquired.contains(mode)) {
+                throw new AsterixException(ErrorCode.LOCK_WAS_ACQUIRED_DIFFERENT_OPERATION, mode, acquired);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public void upgrade(Mode to, IMetadataLock lock) throws AlgebricksException {
+        if (!lockPhase) {
+            throw new AsterixException(ErrorCode.COMPILATION_TWO_PHASE_LOCKING_VIOLATION);
+        }
+        Integer index = indexes.get(lock.getKey());
+        if (index == null) {
+            throw new AsterixException(ErrorCode.UPGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED);
+        }
+        MutablePair<IMetadataLock, Mode> pair = locks.get(index);
+        Mode from = pair.getRight();
+        if (from == to) {
+            return;
+        }
+        lock.upgrade(from, to);
+        pair.setRight(to);
+    }
+
+    public void downgrade(Mode mode, IMetadataLock lock) throws AlgebricksException {
+        if (!lockPhase) {
+            throw new AsterixException(ErrorCode.COMPILATION_TWO_PHASE_LOCKING_VIOLATION);
+        }
+        Integer index = indexes.get(lock.getKey());
+        if (index == null) {
+            throw new AsterixException(ErrorCode.DOWNGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED);
+        }
+        MutablePair<IMetadataLock, Mode> pair = locks.get(index);
+        Mode acquired = pair.getRight();
+        lock.downgrade(acquired, mode);
+        pair.setRight(mode);
+    }
+
+    /**
+     * Once unlock() is called, no caller can call add(IMetadataLock.Mode mode, IMetadataLock lock),
+     * except that reset() is called.
+     */
+    public void unlock() {
+        for (int i = locks.size() - 1; i >= 0; i--) {
+            MutablePair<IMetadataLock, Mode> pair = locks.get(i);
+            pair.getLeft().unlock(pair.getRight());
+        }
+        locks.clear();
+        indexes.clear();
+        lockPhase = false;
+    }
+
+    /**
+     * Clears the state and starts another pass of two phase locking again.
+     */
+    public void reset() {
+        unlock();
+        lockPhase = true;
+    }
+
+    @Override
+    public String toString() {
+        return "{\"phase\" : \"" + (lockPhase ? "lock" : "unlock") + "\", \"locks\" : " + locks + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 9d4db56..9ed6aa5 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -49,7 +49,7 @@
 12 = Invalid implicit scalar to collection coercion in %1$s
 14 = Property %1$s not set
 15 = Storage metadata directory of %1$s in %2$s already exists
-16 = Storage metadata directory of %1$s in %2$s couldn't be created
+16 = Storage metadata directory of %1$s in %2$s could not be created
 17 = Unknown external file pending operation %1$s
 18 = Cannot convert the %1$s type to the %2$s type.
 19 = Cannot convert integer types. The source type should be one of %1$s.
@@ -60,6 +60,7 @@
 24 = Invalid coordinate
 25 = Polygon must have at least 3 points
 26 = %1$s can not be an instance of polygon
+27 = Operation not supported
 
 100 = Unable to instantiate class %1$s
 
@@ -83,7 +84,7 @@
 1024 = Identifier %1$s is not found in AQL+ meta-scope
 1025 = There is no such join type in AQL+
 1026 = The given function expression %1$s cannot utilize index
-1027 = Dataset of type %1$s doesn't have a primary index
+1027 = Dataset of type %1$s does not have a primary index
 1028 = Query parameter %1$s is not supported
 1029 = No metadata exists for dataset %1$s
 1030 = The subtree does not have any data source
@@ -100,6 +101,12 @@
 1041 = Cannot create enforced index on \"%1$s\" field with non-optional type
 1042 = Cannot create non-enforced typed index of this kind: %1$s
 1043 = Cannot use %1$s fields as key for the R-tree index. There can be only one field as a key for the R-tree index.
+1044 = Communication-related exception occurred during the execution of a remote method call
+1045 = Illegal attempt to upgrade metadata lock from %1$s to %2$s
+1046 = Illegal attempt to downgrade metadata lock from %1$s to %2$s
+1047 = Metadata lock cannot be upgraded! because it was not acquired before
+1048 = Metadata lock cannot be downgraded! because it was not acquired before
+1049 = Metadata lock cannot be acquired for %1$s since it is already acquired for %2$s
 
 # Feed Errors
 3001 = Illegal state.
@@ -111,7 +118,7 @@
 3007 = Twitter4J library not found!
 3008 = Unable to ingest data
 3009 = Exception in get record type %1$s for feed
-3010 = Doesn't support Hive data with list of non-primitive types
+3010 = Does not support Hive data with list of non-primitive types
 3011 = Cannot get hive type for field of type %1$s
 3012 = Failed to get columns of record
 3013 = Cannot deserialize Hive records with no closed columns
@@ -189,6 +196,26 @@
 3086 = Cannot find record reader %1$s with specified configuration
 3087 = Cannot find function %1$s
 3088 = %1$s is not a valid runtime Id
+3089 = %1$s is already started and has state %2$s
+3090 = %1$s cannot be stopped because its state is %2$s
+3091 = Cannot add dataset to %1$s because its state is %2$s
+3092 = Cannot remove dataset from %1$s because its state is %2$s
+3093 = %1$s is already registered
+3094 = Cannot create index on dataset %1$s because it is connected to %2$s with state %3$s
+3095 = Cannot drop index of dataset %1$s because it is connected to %2$s with state %3$s
+3096 = Active Notification Handler is suspended
+3097 = Active Entity %1$s has not been registered
+3098 = Cannot deregister %1$s because it is active
+3099 = Attempt to initialize an initialized Active Notification Handler
+3100 = Failed to shutdown event processor for %1$s
+3101 = Recovery request while recovery is currently ongoing
+3102 = Unreported exception causing task failure
+3103 = %1$s is already suspended and has state %2$s
+3104 = %1$s cannot be resumed from state %2$s
+3105 = %1$s is already registered
+3106 = %1$s is not registered
+3107 = Active Notification Handler is already suspended
+3108 = Feed stopped while waiting for a new record
 
 # Lifecycle management errors
 4000 = Partition id %1$d for node %2$s already in use by node %3$s

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
index 48df79b..e62672d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
  * to be implemented by each adapter irrespective of the the kind of
  * adapter(pull or push).
  */
+@FunctionalInterface
 public interface IDataSourceAdapter extends Serializable {
 
     public enum AdapterType {
@@ -38,6 +39,7 @@ public interface IDataSourceAdapter extends Serializable {
 
     /**
      * Triggers the adapter to begin ingesting data from the external source.
+     *
      * @param partition
      *            The adapter could be running with a degree of parallelism.
      *            partition corresponds to the i'th parallel instance.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index bcf5e25..c87fe2d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -63,4 +63,8 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
     public abstract boolean handleException(Throwable th) throws HyracksDataException;
 
     public abstract String getStats();
+
+    public void fail() throws HyracksDataException {
+        tupleForwarder.fail();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 1e62159..5b9b96f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -21,6 +21,8 @@ package org.apache.asterix.external.dataflow;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
@@ -30,6 +32,7 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
@@ -57,8 +60,8 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
         try {
             failed = false;
             tupleForwarder.initialize(ctx, writer);
-            while (recordReader.hasNext()) {
-                IRawRecord<? extends T> record = recordReader.next();
+            while (hasNext()) {
+                IRawRecord<? extends T> record = next();
                 if (record == null) {
                     flush();
                     Thread.sleep(INTERVAL); // NOSONAR: No one notifies the sleeping thread
@@ -70,25 +73,46 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
                     failedRecordsCount++;
                 }
             }
-        } catch (InterruptedException e) {
-            //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
-            LOGGER.warn("Feed has been interrupted. Closing the feed", e);
-            failed = true;
-            try {
-                finish();
-            } catch (HyracksDataException hde) {
-                e.addSuppressed(hde);
+        } catch (HyracksDataException e) {
+            LOGGER.log(Level.WARN, e);
+            //if interrupted while waiting for a new record, then it is safe to not fail forward
+            if (e.getComponent() == ErrorCode.ASTERIX
+                    && e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD) {
+                // Do nothing
+            } else {
+                failed = true;
+                throw e;
             }
-            throw e;
         } catch (Exception e) {
             failed = true;
-            tupleForwarder.flush();
             LOGGER.warn("Failure while operating a feed source", e);
             throw HyracksDataException.create(e);
         }
         finish();
     }
 
+    private IRawRecord<? extends T> next() throws HyracksDataException {
+        try {
+            return recordReader.next();
+        } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
+            throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private boolean hasNext() throws HyracksDataException {
+        boolean hasNext;
+        try {
+            hasNext = recordReader.hasNext();
+        } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
+            throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        return hasNext;
+    }
+
     private void finish() throws HyracksDataException {
         HyracksDataException hde = null;
         try {
@@ -194,6 +218,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
         return dataParser;
     }
 
+    @Override
     public String getStats() {
         return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": "
                 + failedRecordsCount + "}";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 4177ea6..3a8130b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -40,6 +40,7 @@ public class FeedTupleForwarder implements ITupleForwarder {
     private IFrameWriter writer;
     private boolean paused = false;
     private boolean initialized;
+    private boolean failed;
 
     public FeedTupleForwarder(FeedLogManager feedLogManager) {
         this.feedLogManager = feedLogManager;
@@ -67,7 +68,8 @@ public class FeedTupleForwarder implements ITupleForwarder {
                     try {
                         wait();
                     } catch (InterruptedException e) {
-                        throw new HyracksDataException(e);
+                        Thread.currentThread().interrupt();
+                        throw HyracksDataException.create(e);
                     }
                 }
             }
@@ -88,7 +90,7 @@ public class FeedTupleForwarder implements ITupleForwarder {
     public void close() throws HyracksDataException {
         Throwable throwable = null;
         try {
-            if (appender.getTupleCount() > 0) {
+            if (!failed && appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(frame.getBuffer(), writer);
             }
         } catch (Throwable th) {
@@ -116,4 +118,9 @@ public class FeedTupleForwarder implements ITupleForwarder {
     public void flush() throws HyracksDataException {
         appender.flush(writer);
     }
+
+    public void fail() throws HyracksDataException {
+        failed = true;
+        writer.fail();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index e6d81d3..9f32a25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -55,4 +55,8 @@ public class FeedAdapter implements IDataSourceAdapter {
     public String getStats() {
         return controller.getStats();
     }
+
+    public void fail() throws HyracksDataException {
+        controller.fail();
+    }
 }


Mime
View raw message