asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [9/9] asterixdb git commit: [ASTERIXDB-1992][ING] Suspend/Resume active entities
Date Thu, 27 Jul 2017 06:36:03 GMT
[ASTERIXDB-1992][ING] Suspend/Resume active entities

- user model changes: wait for completion only returns
  when the entity becomes inactive.
- storage format changes: no
- interface changes:
  01) Introduce:
      IMetadataLockManager for entity locking.
      IActiveNotificationHandler for handling active events.
      IRetryPolicy for recovery of failed active jobs.
      IActiveEntityController for controlling active entities.
  02) IJobLifecycleListener.notifyJobFinish now passes the
      JobStatus and Exceptions.
  03) IActiveEntityEventsListener.isActive() returns true,
      if entity is active, false otherwise.
  04) IActiveEntityEventsListener.unregister() removes the
      listener upon entity deletion.
  05) IActiveEntityEventsListener.getJobFailure() returns
      the job failure if the entity is in a failed state.
  06) IStatementExecutor.getComponentProvider() returns
      the storage component provider.
  07) IStatementExecutor.getApplicationContext() returns
      the application context.
  08) IMetadataManager.upsertEntity to perfrom metadata
      entities' upsert operations.
  09) IMetadataNode.upsertEntity to perfrom metadata
      entities' upsert operations
  10) ICcApplicationContext.getMetadataLockManager() returns
      the lock manger.
  11) ICcApplicationContext.getClusterStateManager() returns
      the cluster state manager.

details:
- Starting and stopping of active entities now go through
  their listeners rather than having the listener indicating
  active entity and not having it indicating inactive entity.
  This facilitates suspend/resume operations of long running
  active jobs for the sake of DDL operations or topology
  changes.
- Unit tests for the vast majority of code paths and
  different possible scenarios have been added.

Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1875
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a85f4121
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a85f4121
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a85f4121

Branch: refs/heads/master
Commit: a85f4121fd5ed6ec7f985d84da1181ec74201d9e
Parents: d57d81f
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Wed Jul 26 21:35:47 2017 -0700
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Wed Jul 26 23:35:10 2017 -0700

----------------------------------------------------------------------
 .../org/apache/asterix/active/ActiveEvent.java  |   12 +-
 .../active/ActiveJobNotificationHandler.java    |  169 ---
 .../asterix/active/ActiveLifecycleListener.java |   86 --
 .../asterix/active/ActiveRuntimeManager.java    |   15 +-
 .../apache/asterix/active/ActivityState.java    |   39 +-
 .../apache/asterix/active/CountRetryPolicy.java |   39 +
 .../asterix/active/CountRetryPolicyFactory.java |   34 +
 .../active/IActiveEntityEventSubscriber.java    |   57 +
 .../active/IActiveEntityEventsListener.java     |   19 +-
 .../asterix/active/IActiveEventSubscriber.java  |   62 -
 .../active/IActiveNotificationHandler.java      |   93 ++
 .../org/apache/asterix/active/IRetryPolicy.java |   27 +
 .../asterix/active/IRetryPolicyFactory.java     |   27 +
 .../asterix/active/InfiniteRetryPolicy.java     |   42 +
 .../active/InfiniteRetryPolicyFactory.java      |   28 +
 .../asterix/active/NoRetryPolicyFactory.java    |   32 +
 .../active/SingleThreadEventProcessor.java      |   78 +
 .../active/message/ActivePartitionMessage.java  |    8 +-
 .../physical/InvertedIndexPOperator.java        |    2 +-
 .../translator/AbstractLangTranslator.java      |   20 +-
 .../LangExpressionToPlanTranslator.java         |    2 +-
 .../api/http/server/ActiveStatsApiServlet.java  |   14 +-
 .../api/http/server/ConnectorApiServlet.java    |    3 +-
 .../api/http/server/RebalanceApiServlet.java    |   33 +-
 .../app/active/ActiveEntityEventsListener.java  |  659 ++++++++
 .../app/active/ActiveNotificationHandler.java   |  282 ++++
 .../asterix/app/active/FeedEventsListener.java  |  149 ++
 .../DefaultStatementExecutorFactory.java        |    3 +-
 .../asterix/app/translator/QueryTranslator.java |  359 ++---
 .../hyracks/bootstrap/CCApplication.java        |   26 +-
 .../bootstrap/GlobalRecoveryManager.java        |  136 +-
 .../org/apache/asterix/utils/RebalanceUtil.java |   72 +-
 .../http/servlet/ConnectorApiServletTest.java   |    6 +-
 .../app/bootstrap/TestNodeController.java       |    5 +-
 .../org/apache/asterix/test/active/Action.java  |   59 +
 .../test/active/ActiveEventsListenerTest.java   | 1432 ++++++++++++++++++
 .../asterix/test/active/ActiveStatsTest.java    |   61 +-
 .../org/apache/asterix/test/active/Actor.java   |   37 +
 .../test/active/DummyFeedEventsListener.java    |   74 +
 .../test/active/TestClusterControllerActor.java |   85 ++
 .../asterix/test/active/TestEventsListener.java |  202 +++
 .../test/active/TestNodeControllerActor.java    |   67 +
 .../asterix/test/active/TestUserActor.java      |  287 ++++
 .../asterix/test/common/TestExecutor.java       |    9 +-
 .../src/test/resources/runtimets/testsuite.xml  |    6 +-
 .../common/api/IClusterEventsSubscriber.java    |    2 +-
 .../common/api/IMetadataLockManager.java        |  311 ++++
 .../common/cluster/IGlobalRecoveryManager.java  |   15 +-
 .../common/dataflow/ICcApplicationContext.java  |   16 +-
 .../common/exceptions/AsterixException.java     |   21 +-
 .../common/exceptions/CompilationException.java |    3 +-
 .../asterix/common/exceptions/ErrorCode.java    |   27 +
 .../common/exceptions/MetadataException.java    |   77 +
 .../common/exceptions/RuntimeDataException.java |    3 +-
 .../asterix/common/metadata/IDataset.java       |   11 +-
 .../asterix/common/metadata/IMetadataLock.java  |   93 ++
 .../asterix/common/metadata/LockList.java       |  128 ++
 .../main/resources/asx_errormsg/en.properties   |   33 +-
 .../external/api/IDataSourceAdapter.java        |    2 +
 .../AbstractFeedDataFlowController.java         |    4 +
 .../dataflow/FeedRecordDataFlowController.java  |   49 +-
 .../external/dataflow/FeedTupleForwarder.java   |   11 +-
 .../external/dataset/adapter/FeedAdapter.java   |    4 +
 .../management/ActiveEntityEventsListener.java  |  248 ---
 .../external/feed/runtime/AdapterExecutor.java  |    4 +
 .../external/feed/watch/AbstractSubscriber.java |   37 +-
 .../external/feed/watch/NoOpSubscriber.java     |    9 +-
 .../external/feed/watch/StatsSubscriber.java    |   12 +-
 .../feed/watch/WaitForStateSubscriber.java      |   32 +-
 .../aql/statement/SubscribeFeedStatement.java   |    2 +-
 .../common/statement/ConnectFeedStatement.java  |    5 +-
 .../apache/asterix/metadata/MetadataCache.java  |   15 +-
 .../asterix/metadata/MetadataException.java     |   44 -
 .../asterix/metadata/MetadataManager.java       |  131 +-
 .../apache/asterix/metadata/MetadataNode.java   |  214 ++-
 .../metadata/api/IActiveEntityController.java   |  107 ++
 .../api/IMetadataEntityTupleTranslator.java     |    2 +-
 .../asterix/metadata/api/IMetadataManager.java  |   12 +-
 .../asterix/metadata/api/IMetadataNode.java     |   14 +-
 .../asterix/metadata/api/IValueExtractor.java   |    2 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |   17 +-
 .../metadata/declared/MetadataManagerUtil.java  |    2 +-
 .../metadata/declared/MetadataProvider.java     |   71 +-
 .../metadata/entities/BuiltinTypeMap.java       |    2 +-
 .../asterix/metadata/entities/Dataset.java      |   42 +-
 .../metadata/entities/FeedConnection.java       |    2 -
 .../apache/asterix/metadata/entities/Index.java |    5 +
 .../CompactionPolicyTupleTranslator.java        |    2 +-
 .../DatasetTupleTranslator.java                 |    2 +-
 .../DatasourceAdapterTupleTranslator.java       |    2 +-
 .../DatatypeTupleTranslator.java                |    2 +-
 .../DataverseTupleTranslator.java               |    2 +-
 .../ExternalFileTupleTranslator.java            |    2 +-
 .../FeedConnectionTupleTranslator.java          |    2 +-
 .../FeedPolicyTupleTranslator.java              |    2 +-
 .../FeedTupleTranslator.java                    |    2 +-
 .../FunctionTupleTranslator.java                |    2 +-
 .../IndexTupleTranslator.java                   |    2 +-
 .../LibraryTupleTranslator.java                 |    2 +-
 .../NodeGroupTupleTranslator.java               |    2 +-
 .../NodeTupleTranslator.java                    |    2 +-
 .../metadata/feeds/BuiltinFeedPolicies.java     |   61 +-
 .../metadata/feeds/FeedMetadataUtil.java        |    7 +-
 .../functions/ExternalFunctionCompilerUtil.java |    2 +-
 .../asterix/metadata/lock/DatasetLock.java      |  152 +-
 .../asterix/metadata/lock/IMetadataLock.java    |   57 -
 .../apache/asterix/metadata/lock/LockList.java  |   79 -
 .../asterix/metadata/lock/MetadataLock.java     |   11 +-
 .../metadata/lock/MetadataLockManager.java      |  213 +--
 .../asterix/metadata/utils/DatasetUtil.java     |   48 +-
 .../metadata/utils/MetadataLockUtil.java        |  168 ++
 .../asterix/metadata/utils/MetadataUtil.java    |    5 +
 .../utils/SplitsAndConstraintsUtil.java         |   18 +-
 .../MetadataEntityValueExtractor.java           |    2 +-
 .../NestedDatatypeNameValueExtractor.java       |    2 +-
 .../TupleCopyValueExtractor.java                |    2 +-
 .../DatasetTupleTranslatorTest.java             |    2 +-
 .../IndexTupleTranslatorTest.java               |    2 +-
 .../runtime/utils/CcApplicationContext.java     |   18 +-
 .../runtime/utils/ClusterStateManager.java      |   27 +-
 .../asterix/tools/datagen/AdmDataGen.java       |    2 +-
 .../tools/translator/ADGenDmlTranslator.java    |    2 +-
 .../opcallbacks/UpsertOperationCallback.java    |    2 +-
 .../api/dataset/IDatasetPartitionManager.java   |   18 +-
 .../api/exceptions/HyracksDataException.java    |   11 +-
 .../hyracks/api/job/IJobLifecycleListener.java  |   32 +-
 .../control/cc/ClusterControllerService.java    |   21 +-
 .../cc/application/CCServiceContext.java        |    9 +-
 .../hyracks/control/cc/cluster/NodeManager.java |   37 +-
 .../cc/dataset/DatasetDirectoryService.java     |    7 +-
 .../hyracks/control/cc/job/JobManager.java      |   15 +-
 .../control/cc/work/RegisterNodeWork.java       |    4 +
 .../control/cc/cluster/NodeManagerTest.java     |    6 +-
 .../control/common/utils/ExceptionUtils.java    |    9 +-
 .../control/nc/NodeControllerService.java       |   54 +-
 .../nc/dataset/DatasetPartitionManager.java     |   22 +-
 .../nc/partitions/PipelinedPartition.java       |   19 +-
 .../control/nc/work/AbortAllTasksWork.java      |   59 +
 .../hyracks/control/nc/work/AbortTasksWork.java |    4 +-
 .../dataflow/common/utils/FrameDebugUtils.java  |   56 +-
 .../NonDeterministicChannelReader.java          |    7 +-
 141 files changed, 6211 insertions(+), 1887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index 1141912..a810576 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -30,7 +30,9 @@ public class ActiveEvent {
         JOB_FINISHED,
         PARTITION_EVENT,
         EXTENSION_EVENT,
-        STATS_UPDATED
+        STATS_UPDATED,
+        STATE_CHANGED,
+        FAILURE
     }
 
     private final JobId jobId;
@@ -45,10 +47,6 @@ public class ActiveEvent {
         this.eventObject = eventObject;
     }
 
-    public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) {
-        this(jobId, eventKind, entityId, null);
-    }
-
     public JobId getJobId() {
         return jobId;
     }
@@ -79,8 +77,8 @@ public class ActiveEvent {
             return true;
         }
         ActiveEvent other = (ActiveEvent) o;
-        return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind) && Objects
-                .equals(eventObject, other.eventObject);
+        return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind)
+                && Objects.equals(eventObject, other.eventObject);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
deleted file mode 100644
index d2b8a89..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class ActiveJobNotificationHandler implements Runnable {
-    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
-    private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName());
-    private static final boolean DEBUG = false;
-    private final LinkedBlockingQueue<ActiveEvent> eventInbox;
-    private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners;
-    private final Map<JobId, EntityId> jobId2ActiveJobInfos;
-
-    public ActiveJobNotificationHandler() {
-        this.eventInbox = new LinkedBlockingQueue<>();
-        this.jobId2ActiveJobInfos = new HashMap<>();
-        this.entityEventListeners = new HashMap<>();
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName());
-        LOGGER.log(Level.INFO, "Started " + ActiveJobNotificationHandler.class.getSimpleName());
-        while (!Thread.interrupted()) {
-            try {
-                ActiveEvent event = getEventInbox().take();
-                EntityId entityId = jobId2ActiveJobInfos.get(event.getJobId());
-                if (entityId != null) {
-                    IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
-                    LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind());
-                    if (event.getEventKind() == Kind.JOB_FINISHED) {
-                        LOGGER.log(Level.FINER, "Removing the job");
-                        jobId2ActiveJobInfos.remove(event.getJobId());
-                    }
-                    if (listener != null) {
-                        LOGGER.log(Level.FINER, "Notifying the listener");
-                        listener.notify(event);
-                    }
-
-                } else {
-                    LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId());
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                LOGGER.log(Level.SEVERE, "Error handling an active job event", e);
-            }
-        }
-        LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
-    }
-
-    public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException {
-        LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
-        unregisterListener(listener);
-    }
-
-    public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
-            IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
-            LOGGER.log(Level.WARNING, "Listener found: " + listener);
-        }
-        return entityEventListeners.get(entityId);
-    }
-
-    public EntityId getEntity(JobId jobId) {
-        return jobId2ActiveJobInfos.get(jobId);
-    }
-
-    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
-        LOGGER.log(Level.FINER,
-                "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId);
-        Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
-        if (property == null || !(property instanceof EntityId)) {
-            LOGGER.log(Level.FINER, "Job was is not active. property found to be: " + property);
-            return;
-        }
-        EntityId entityId = (EntityId) property;
-        monitorJob(jobId, entityId);
-        boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-        LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : "Inactive"));
-        IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
-        if (listener != null) {
-            // It is okay to bypass the event inbox in this case because we know this is the first event for this entity
-            listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
-        }
-        LOGGER.log(Level.FINER, "Listener was notified" + jobId);
-    }
-
-    public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
-        return eventInbox;
-    }
-
-    public synchronized IActiveEntityEventsListener[] getEventListeners() {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getEventListeners() was called");
-            LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners");
-        }
-        return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]);
-    }
-
-    public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
-        if (DEBUG) {
-            LOGGER.log(Level.FINER, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
-                    + listener.getEntityId());
-        }
-        if (entityEventListeners.containsKey(listener.getEntityId())) {
-            throw new HyracksDataException(
-                    "Active Entity Listener " + listener.getEntityId() + " is already registered");
-        }
-        entityEventListeners.put(listener.getEntityId(), listener);
-    }
-
-    public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException {
-        LOGGER.log(Level.FINER, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
-                + listener.getEntityId());
-        IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
-        if (registeredListener == null) {
-            throw new HyracksDataException(
-                    "Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
-        }
-    }
-
-    public synchronized void monitorJob(JobId jobId, EntityId activeJob) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
-            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
-        }
-        if (entityEventListeners.containsKey(activeJob)) {
-            if (jobId2ActiveJobInfos.containsKey(jobId)) {
-                LOGGER.severe("Job is already being monitored for job: " + jobId);
-                return;
-            }
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId);
-            }
-        } else {
-            LOGGER.severe("No listener was found for the entity: " + activeJob);
-        }
-        jobId2ActiveJobInfos.put(jobId, activeJob);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
deleted file mode 100644
index 86c3e7d..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IJobLifecycleListener;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class ActiveLifecycleListener implements IJobLifecycleListener {
-
-    private static final Logger LOGGER = Logger.getLogger(ActiveLifecycleListener.class.getName());
-
-    private final ActiveJobNotificationHandler notificationHandler;
-    private final LinkedBlockingQueue<ActiveEvent> jobEventInbox;
-    private final ExecutorService executorService;
-
-    public ActiveLifecycleListener() {
-        notificationHandler = new ActiveJobNotificationHandler();
-        jobEventInbox = notificationHandler.getEventInbox();
-        executorService = Executors.newSingleThreadExecutor();
-        executorService.execute(notificationHandler);
-    }
-
-    @Override
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
-        EntityId entityId = notificationHandler.getEntity(jobId);
-        if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId));
-        }
-    }
-
-    @Override
-    public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
-        EntityId entityId = notificationHandler.getEntity(jobId);
-        if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId));
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
-            }
-        }
-    }
-
-    @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
-        notificationHandler.notifyJobCreation(jobId, spec);
-    }
-
-    public void receive(ActivePartitionMessage message) {
-        jobEventInbox.add(new ActiveEvent(message.getJobId(), Kind.PARTITION_EVENT,
-                message.getActiveRuntimeId().getEntityId(), message));
-    }
-
-    public void stop() {
-        executorService.shutdown();
-    }
-
-    public ActiveJobNotificationHandler getNotificationHandler() {
-        return notificationHandler;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
index e71367a..18368ae 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
@@ -28,6 +28,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 public class ActiveRuntimeManager {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
@@ -65,11 +69,18 @@ public class ActiveRuntimeManager {
         return activeRuntimes.get(runtimeId);
     }
 
-    public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime) {
+    public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime)
+            throws HyracksDataException {
+        if (activeRuntimes.containsKey(runtimeId)) {
+            throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED, runtimeId);
+        }
         activeRuntimes.put(runtimeId, feedRuntime);
     }
 
-    public synchronized void deregisterRuntime(ActiveRuntimeId runtimeId) {
+    public void deregisterRuntime(ActiveRuntimeId runtimeId) throws HyracksDataException {
+        if (!activeRuntimes.containsKey(runtimeId)) {
+            throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId);
+        }
         activeRuntimes.remove(runtimeId);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index af8f5ca..eb43d10 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -20,27 +20,44 @@ package org.apache.asterix.active;
 
 public enum ActivityState {
     /**
-     * The initial state of an activity.
+     * The starting state and a possible terminal state.
      */
-    CREATED,
+    STOPPED,
     /**
-     * The starting state and a possible terminal state. Next state can only be {@code ActivityState.STARTING}
+     * Failure to recover from a temporary faliure caused the activity to fail permanantly.
+     * No further recovery attempts will be made.
      */
-    STOPPED,
+    PERMANENTLY_FAILED,
     /**
-     * A terminal state
+     * An unexpected failure caused the activity to fail but recovery attempts will start taking place
      */
-    FAILED,
+    TEMPORARILY_FAILED,
     /**
-     * An intermediate state. Next state can only be {@code ActivityState.STARTED} or {@code ActivityState.FAILED}
+     * Attempting to recover from temporary failure.
+     */
+    RECOVERING,
+    /**
+     * During an attempt to start the activity
      */
     STARTING,
     /**
-     * An intermediate state. Next state can only be {@code ActivityState.STOPPING} or {@code ActivityState.FAILED}
+     * The activity has been started successfully and is running
+     */
+    RUNNING,
+    /**
+     * During an attempt to gracefully stop the activity
+     */
+    STOPPING,
+    /**
+     * During an attempt to gracefully suspend the activity
+     */
+    SUSPENDING,
+    /**
+     * The activitiy has been suspended successfully. Next state must be resuming
      */
-    STARTED,
+    SUSPENDED,
     /**
-     * An intermediate state. Next state can only be {@code ActivityState.STOPPED} or {@code ActivityState.FAILED}
+     * During an attempt to restart the activity after suspension
      */
-    STOPPING
+    RESUMING
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
new file mode 100644
index 0000000..6633810
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class CountRetryPolicy implements IRetryPolicy {
+
+    private final int count;
+    private int attempted = 0;
+
+    public CountRetryPolicy(int count) {
+        this.count = count;
+    }
+
+    @Override
+    public boolean retry() {
+        if (attempted < count) {
+            attempted++;
+            return true;
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java
new file mode 100644
index 0000000..5e26ae4
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class CountRetryPolicyFactory implements IRetryPolicyFactory {
+
+    private final int count;
+
+    public CountRetryPolicyFactory(int count) {
+        this.count = count;
+    }
+
+    @Override
+    public IRetryPolicy create(IActiveEntityEventsListener listener) {
+        return new CountRetryPolicy(count);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
new file mode 100644
index 0000000..f9357b4
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An active event subscriber that subscribes to events related to active entity
+ */
+public interface IActiveEntityEventSubscriber {
+
+    /**
+     * Notifies the subscriber of a new event
+     *
+     * @param event
+     * @throws HyracksDataException
+     */
+    void notify(ActiveEvent event) throws HyracksDataException;
+
+    /**
+     * Checkcs whether the subscriber is done receiving events
+     *
+     * @return
+     */
+    boolean isDone();
+
+    /**
+     * Wait until the terminal event has been received
+     *
+     * @throws Exception
+     */
+    void sync() throws HyracksDataException, InterruptedException;
+
+    /**
+     * callback upon successful subscription
+     *
+     * @param eventsListener
+     * @throws HyracksDataException
+     */
+    void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 4bc02f3..03b0cfc 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -58,7 +58,7 @@ public interface IActiveEntityEventsListener {
      * @param subscriber
      * @throws HyracksDataException
      */
-    void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException;
+    void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException;
 
     /**
      * The most recent acquired stats for the active entity
@@ -79,4 +79,21 @@ public interface IActiveEntityEventsListener {
      * @throws HyracksDataException
      */
     void refreshStats(long timeout) throws HyracksDataException;
+
+    /**
+     * @return true, if entity is active, false otherwise
+     */
+    boolean isActive();
+
+    /**
+     * unregister the listener upon deletion of entity
+     *
+     * @throws HyracksDataException
+     */
+    void unregister() throws HyracksDataException;
+
+    /**
+     * Get the job failure for the last failed run
+     */
+    Exception getJobFailure();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
deleted file mode 100644
index 69f7f1c..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An active event subscriber that subscribe to events related to active entity
- */
-public interface IActiveEventSubscriber {
-
-    /**
-     * Notify the subscriber of a new event
-     *
-     * @param event
-     * @throws HyracksDataException
-     */
-    void notify(ActiveEvent event) throws HyracksDataException;
-
-    /**
-     * Checkcs whether the subscriber is done receiving events
-     *
-     * @return
-     */
-    boolean isDone();
-
-    /**
-     * Wait until the terminal event has been received
-     *
-     * @throws InterruptedException
-     */
-    void sync() throws InterruptedException;
-
-    /**
-     * Stop watching events
-     */
-    void unsubscribe();
-
-    /**
-     * callback upon successful subscription
-     *
-     * @param eventsListener
-     * @throws HyracksDataException
-     */
-    void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException;
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java
new file mode 100644
index 0000000..8b9f232
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java
@@ -0,0 +1,93 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+
+/**
+ * Represents the notification handler for events of active entity jobs
+ */
+public interface IActiveNotificationHandler {
+
+    /**
+     * Recover all active jobs that failed
+     *
+     * @throws HyracksDataException
+     */
+    void recover() throws HyracksDataException;
+
+    /**
+     * Set whether handler initialization has completed or not
+     *
+     * @param initialized
+     * @throws HyracksDataException
+     */
+    void setInitialized(boolean initialized) throws HyracksDataException;
+
+    /**
+     * @return true if initialization has completed, false otherwise
+     */
+    boolean isInitialized();
+
+    /**
+     * Register a listener for events of an active entity
+     *
+     * @param listener
+     *            the listener to register
+     * @throws HyracksDataException
+     *             if the active entity already has a listener associated with it
+     */
+    void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException;
+
+    /**
+     * Unregister a listener for events of an active entity
+     *
+     * @param listener
+     *            the listener to unregister
+     * @throws HyracksDataException
+     *             if the entity is still active or if the listener was not registered
+     */
+    void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException;
+
+    /**
+     * @return all the registered event listeners
+     */
+    IActiveEntityEventsListener[] getEventListeners();
+
+    /**
+     * Lookup an event listener using the entity id
+     *
+     * @param entityId
+     *            the lookup key
+     * @return the registered listener if found, null otherwise
+     */
+    IActiveEntityEventsListener getListener(EntityId entityId);
+
+    /**
+     * Recieves an active job message from an nc
+     *
+     * @param message
+     *            the message
+     */
+    void receive(ActivePartitionMessage message);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
new file mode 100644
index 0000000..a010984
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IRetryPolicy {
+    /**
+     * @return true if one more attempt should be done
+     */
+    boolean retry();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
new file mode 100644
index 0000000..a946337
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IRetryPolicyFactory {
+    /**
+     * @return an instance of retry policy
+     */
+    IRetryPolicy create(IActiveEntityEventsListener listener);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
new file mode 100644
index 0000000..074f8f4
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class InfiniteRetryPolicy implements IRetryPolicy {
+
+    private final IActiveEntityEventsListener listener;
+
+    public InfiniteRetryPolicy(IActiveEntityEventsListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public boolean retry() {
+        synchronized (listener) {
+            try {
+                listener.wait(5000); //NOSONAR this method is being called in a while loop
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java
new file mode 100644
index 0000000..b31d245
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class InfiniteRetryPolicyFactory implements IRetryPolicyFactory {
+
+    @Override
+    public IRetryPolicy create(IActiveEntityEventsListener listener) {
+        return new InfiniteRetryPolicy(listener);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
new file mode 100644
index 0000000..1596c17
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class NoRetryPolicyFactory implements IRetryPolicyFactory {
+    public static final NoRetryPolicyFactory INSTANCE = new NoRetryPolicyFactory();
+    private static final IRetryPolicy policy = () -> false;
+
+    private NoRetryPolicyFactory() {
+    }
+
+    @Override
+    public IRetryPolicy create(IActiveEntityEventsListener listener) {
+        return policy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
new file mode 100644
index 0000000..0a36216
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class SingleThreadEventProcessor<T> implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(SingleThreadEventProcessor.class.getName());
+    private final String name;
+    private final LinkedBlockingQueue<T> eventInbox;
+    private final ExecutorService executorService;
+    private final Future<?> future;
+
+    public SingleThreadEventProcessor(String threadName) {
+        this.name = threadName;
+        eventInbox = new LinkedBlockingQueue<>();
+        executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName));
+        future = executorService.submit(this);
+    }
+
+    @Override
+    public final void run() {
+        LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                T event = eventInbox.take();
+                handle(event);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Error handling an event", e);
+            }
+        }
+        LOGGER.log(Level.WARNING, "Stopped " + Thread.currentThread().getName());
+    }
+
+    protected abstract void handle(T event) throws Exception; //NOSONAR
+
+    public void add(T event) {
+        if (!eventInbox.add(event)) {
+            throw new IllegalStateException();
+        }
+    }
+
+    public void stop() throws HyracksDataException, InterruptedException {
+        future.cancel(true);
+        executorService.shutdown();
+        if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+            throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 9391044..a47d5a5 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -21,8 +21,8 @@ package org.apache.asterix.active.message;
 import java.io.Serializable;
 import java.util.Objects;
 
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.IActiveNotificationHandler;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -64,7 +64,7 @@ public class ActivePartitionMessage implements ICcAddressedMessage {
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        IActiveNotificationHandler activeListener = (IActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         activeListener.receive(this);
     }
 
@@ -87,7 +87,7 @@ public class ActivePartitionMessage implements ICcAddressedMessage {
             return true;
         }
         ActivePartitionMessage other = (ActivePartitionMessage) o;
-        return Objects.equals(other.activeRuntimeId, activeRuntimeId) && Objects.equals(other.jobId, jobId) && Objects
-                .equals(other.payload, payload);
+        return Objects.equals(other.activeRuntimeId, activeRuntimeId) && Objects.equals(other.jobId, jobId)
+                && Objects.equals(other.payload, payload);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 213c60b..cd0a63c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.algebra.operators.physical;
 
-import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.DataSourceId;
 import org.apache.asterix.metadata.declared.MetadataProvider;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index eee6bdc..1d47095 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -25,6 +25,8 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.base.Statement;
@@ -51,11 +53,13 @@ public abstract class AbstractLangTranslator {
     public void validateOperation(ICcApplicationContext appCtx, Dataverse defaultDataverse, Statement stmt)
             throws AsterixException {
 
-        if (!(ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)
-                && ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted())) {
+        final IClusterStateManager clusterStateManager = ClusterStateManager.INSTANCE;
+        final IGlobalRecoveryManager globalRecoveryManager = appCtx.getGlobalRecoveryManager();
+        if (!(clusterStateManager.getState().equals(ClusterState.ACTIVE)
+                && globalRecoveryManager.isRecoveryCompleted())) {
             int maxWaitCycles = appCtx.getExternalProperties().getMaxWaitClusterActive();
             try {
-                ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE, maxWaitCycles, TimeUnit.SECONDS);
+                clusterStateManager.waitForState(ClusterState.ACTIVE, maxWaitCycles, TimeUnit.SECONDS);
             } catch (HyracksDataException e) {
                 throw new AsterixException(e);
             } catch (InterruptedException e) {
@@ -64,7 +68,7 @@ public abstract class AbstractLangTranslator {
                 }
                 Thread.currentThread().interrupt();
             }
-            if (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)) {
+            if (!clusterStateManager.getState().equals(ClusterState.ACTIVE)) {
                 throw new AsterixException("Cluster is in " + ClusterState.UNUSABLE + " state."
                         + "\n One or more Node Controllers have left or haven't joined yet.\n");
             } else {
@@ -74,16 +78,16 @@ public abstract class AbstractLangTranslator {
             }
         }
 
-        if (ClusterStateManager.INSTANCE.getState().equals(ClusterState.UNUSABLE)) {
+        if (clusterStateManager.getState().equals(ClusterState.UNUSABLE)) {
             throw new AsterixException("Cluster is in " + ClusterState.UNUSABLE + " state."
                     + "\n One or more Node Controllers have left.\n");
         }
 
-        if (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted()) {
+        if (!globalRecoveryManager.isRecoveryCompleted()) {
             int maxWaitCycles = appCtx.getExternalProperties().getMaxWaitClusterActive();
             int waitCycleCount = 0;
             try {
-                while (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted() && waitCycleCount < maxWaitCycles) {
+                while (!globalRecoveryManager.isRecoveryCompleted() && waitCycleCount < maxWaitCycles) {
                     Thread.sleep(1000);
                     waitCycleCount++;
                 }
@@ -93,7 +97,7 @@ public abstract class AbstractLangTranslator {
                 }
                 Thread.currentThread().interrupt();
             }
-            if (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted()) {
+            if (!globalRecoveryManager.isRecoveryCompleted()) {
                 throw new AsterixException("Cluster Global recovery is not yet complete and the system is in "
                         + ClusterState.ACTIVE + " state");
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index b8a6d8a..a1c5cf4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -36,6 +36,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionConstants;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -72,7 +73,6 @@ import org.apache.asterix.lang.common.struct.OperatorType;
 import org.apache.asterix.lang.common.struct.QuantifiedPair;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.lang.common.visitor.base.AbstractQueryExpressionVisitor;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.DataSourceId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
index 593faa6..15f0ace 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
@@ -18,15 +18,13 @@
  */
 package org.apache.asterix.api.http.server;
 
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.feed.watch.StatsSubscriber;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -38,19 +36,21 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 public class ActiveStatsApiServlet extends AbstractServlet {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveStatsApiServlet.class.getName());
     private static final int DEFAULT_EXPIRE_TIME = 2000;
-    private final ActiveLifecycleListener activeLifecycleListener;
+    private final ActiveNotificationHandler activeNotificationHandler;
 
     public ActiveStatsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
         super(ctx, paths);
-        this.activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        this.activeNotificationHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
     }
 
     private JsonNode constructNode(ObjectMapper om, IActiveEntityEventsListener eventListener, long currentTime,
-            long ttl) throws InterruptedException, IOException {
+            long ttl) throws Exception {
         long statsTimeStamp = eventListener.getStatsTimeStamp();
         if (currentTime - statsTimeStamp > ttl) {
             StatsSubscriber subscriber = new StatsSubscriber(eventListener);
@@ -66,7 +66,7 @@ public class ActiveStatsApiServlet extends AbstractServlet {
         // Obtain all feed status
         String localPath = localPath(request);
         int expireTime;
-        IActiveEntityEventsListener[] listeners = activeLifecycleListener.getNotificationHandler().getEventListeners();
+        IActiveEntityEventsListener[] listeners = activeNotificationHandler.getEventListeners();
         ObjectMapper om = new ObjectMapper();
         om.enable(SerializationFeature.INDENT_OUTPUT);
         ObjectNode resNode = om.createObjectNode();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index 03958ed..d9a63f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -29,7 +29,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -93,7 +92,7 @@ public class ConnectorApiServlet extends AbstractServlet {
             MetadataManager.INSTANCE.init();
             MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             // Retrieves file splits of the dataset.
-            MetadataProvider metadataProvider = new MetadataProvider(appCtx, null, new StorageComponentProvider());
+            MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
             try {
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index e9d231a..27e2806 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -36,8 +36,9 @@ import java.util.concurrent.Future;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -72,7 +73,7 @@ public class RebalanceApiServlet extends AbstractServlet {
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
     // A queue that maintains submitted rebalance requests.
-    private final Queue<Future> rebalanceTasks = new ArrayDeque<>();
+    private final Queue<Future<Void>> rebalanceTasks = new ArrayDeque<>();
 
     // A queue that tracks the termination of rebalance threads.
     private final Queue<CountDownLatch> rebalanceFutureTerminated = new ArrayDeque<>();
@@ -141,7 +142,7 @@ public class RebalanceApiServlet extends AbstractServlet {
 
     // Cancels all rebalance tasks.
     private synchronized void cancelRebalance() throws InterruptedException {
-        for (Future rebalanceTask : rebalanceTasks) {
+        for (Future<Void> rebalanceTask : rebalanceTasks) {
             rebalanceTask.cancel(true);
         }
     }
@@ -156,14 +157,15 @@ public class RebalanceApiServlet extends AbstractServlet {
     private synchronized CountDownLatch scheduleRebalance(String dataverseName, String datasetName,
             String[] targetNodes, IServletResponse response) {
         CountDownLatch terminated = new CountDownLatch(1);
-        Future task = executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated));
+        Future<Void> task =
+                executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated));
         rebalanceTasks.add(task);
         rebalanceFutureTerminated.add(terminated);
         return terminated;
     }
 
     // Performs the actual rebalance.
-    private void doRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response,
+    private Void doRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response,
             CountDownLatch terminated) {
         try {
             // Sets the content type.
@@ -198,6 +200,7 @@ public class RebalanceApiServlet extends AbstractServlet {
             // Notify that the rebalance task is terminated.
             terminated.countDown();
         }
+        return null;
     }
 
     // Lists all datasets that should be rebalanced in a given datavserse.
@@ -241,9 +244,23 @@ public class RebalanceApiServlet extends AbstractServlet {
     // Rebalances a given dataset.
     private void rebalanceDataset(String dataverseName, String datasetName, String[] targetNodes) throws Exception {
         IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
-        MetadataProvider metadataProvider = new MetadataProvider(appCtx, null, new StorageComponentProvider());
-        RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
-                metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE);
+        MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        try {
+            ActiveNotificationHandler activeNotificationHandler =
+                    (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+            activeNotificationHandler.suspend(metadataProvider);
+            try {
+                IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
+                lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+                        dataverseName + '.' + datasetName);
+                RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
+                        metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE);
+            } finally {
+                activeNotificationHandler.resume(metadataProvider);
+            }
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
     }
 
     // Sends HTTP response to the request client.


Mime
View raw message