asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/2] asterixdb git commit: [NO ISSUE][*DB] Explicitly create ClusterStateManager
Date Wed, 06 Sep 2017 16:52:33 GMT
[NO ISSUE][*DB] Explicitly create ClusterStateManager

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, we create the cluster state manager using the
  singleton pattern.
- After this change, cluster state manager is created
  as part of the cc application start.
- To access the cluster state manager after this change,
  the cc application context is used.

Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1944
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/42441374
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/42441374
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/42441374

Branch: refs/heads/master
Commit: 424413743d6211f19c7f9b0ecc30b5dcafe823d9
Parents: 9b9dc22
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Fri Sep 1 10:44:12 2017 -0700
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Wed Sep 6 09:52:17 2017 -0700

----------------------------------------------------------------------
 .../optimizer/rules/UnnestToDataScanRule.java   |   8 +-
 .../translator/AbstractLangTranslator.java      |   3 +-
 .../common/AsterixHyracksIntegrationUtil.java   |   4 +-
 .../api/http/server/ClusterApiServlet.java      |  10 +-
 .../ClusterControllerDetailsApiServlet.java     |   6 +-
 .../api/http/server/DiagnosticsApiServlet.java  |  12 +-
 .../server/NodeControllerDetailsApiServlet.java |  14 ++-
 .../api/http/server/QueryServiceServlet.java    |   4 +-
 .../api/http/server/ShutdownApiServlet.java     |  12 +-
 .../asterix/app/cc/ResourceIdManager.java       |  68 ------------
 .../message/ExecuteStatementRequestMessage.java |   6 +-
 .../asterix/app/nc/NCAppRuntimeContext.java     |   1 -
 .../asterix/app/translator/QueryTranslator.java |   5 +-
 .../hyracks/bootstrap/CCApplication.java        |  25 ++---
 .../bootstrap/ClusterLifecycleListener.java     |  15 ++-
 .../hyracks/bootstrap/ClusterWorkExecutor.java  |   5 +-
 .../apache/asterix/utils/FeedOperations.java    |  15 +--
 .../app/bootstrap/TestNodeController.java       |   5 +-
 .../asterix/common/api/IApplicationContext.java |   1 -
 .../common/cluster/IClusterStateManager.java    | 111 +++++++++++++++++++
 .../common/dataflow/ICcApplicationContext.java  |   6 +
 .../adapter/factory/GenericAdapterFactory.java  |  13 ++-
 .../api/IExternalDataSourceFactory.java         |   9 +-
 .../reader/rss/RSSRecordReaderFactory.java      |   4 +-
 .../twitter/TwitterRecordReaderFactory.java     |  17 +--
 .../factory/SocketClientInputStreamFactory.java |   4 +-
 .../factory/TwitterFirehoseStreamFactory.java   |   8 +-
 .../apache/asterix/external/util/FeedUtils.java |  16 +--
 .../apache/asterix/external/util/HDFSUtils.java |   7 +-
 .../reader/RecordWithPKTestReaderFactory.java   |  10 +-
 .../record/reader/kv/KVTestReaderFactory.java   |  15 ++-
 .../adapter/TestTypedAdapterFactory.java        |   3 +-
 .../metadata/dataset/hints/DatasetHints.java    |   3 +-
 .../metadata/declared/FeedDataSource.java       |  31 +++---
 .../metadata/declared/MetadataManagerUtil.java  |  20 ++--
 .../metadata/declared/MetadataProvider.java     |  18 +--
 .../utils/SplitsAndConstraintsUtil.java         |  23 ++--
 .../message/ResourceIdRequestMessage.java       |  12 +-
 .../runtime/transaction/ResourceIdManager.java  |  73 ++++++++++++
 .../runtime/utils/CcApplicationContext.java     |  18 +--
 .../runtime/utils/ClusterStateManager.java      |  35 +++---
 41 files changed, 420 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 7f35d08..4550ba6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -260,10 +260,10 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
             keyAccessExpression = null;
             keyAccessScalarFunctionCallExpression = null;
         }
-        FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, targetDataset, feedOutputType, metaType,
-                pkTypes, partitioningKeys, keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
-                FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(),
-                feedConnection);
+        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed,
+                aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression,
+                sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+                context.getComputationNodeDomain(), feedConnection);
         feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
         return feedDataSource;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index 1d47095..c3f01e8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -38,7 +38,6 @@ import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.metadata.dataset.hints.DatasetHints;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -53,7 +52,7 @@ public abstract class AbstractLangTranslator {
     public void validateOperation(ICcApplicationContext appCtx, Dataverse defaultDataverse, Statement stmt)
             throws AsterixException {
 
-        final IClusterStateManager clusterStateManager = ClusterStateManager.INSTANCE;
+        final IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
         final IGlobalRecoveryManager globalRecoveryManager = appCtx.getGlobalRecoveryManager();
         if (!(clusterStateManager.getState().equals(ClusterState.ACTIVE)
                 && globalRecoveryManager.isRecoveryCompleted())) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2799765..71c67f4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -32,10 +32,10 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.PropertiesAccessor;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.hyracks.bootstrap.NCApplication;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.application.INCApplication;
@@ -116,7 +116,7 @@ public class AsterixHyracksIntegrationUtil {
             thread.join();
         }
         // Wait until cluster becomes active
-        ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+        ((ICcApplicationContext) cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE);
         hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort());
         this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index 4faab1e..3e1a16d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -27,8 +27,8 @@ import java.util.function.Predicate;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.control.common.config.ConfigUtils;
@@ -58,9 +58,11 @@ public class ClusterApiServlet extends AbstractServlet {
     protected static final String VERSION_URI_KEY = "versionUri";
     protected static final String DIAGNOSTICS_URI_KEY = "diagnosticsUri";
     private final ObjectMapper om = new ObjectMapper();
+    protected final ICcApplicationContext appCtx;
 
-    public ClusterApiServlet(ConcurrentMap<String, Object> ctx, String... paths) {
+    public ClusterApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) {
         super(ctx, paths);
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -92,11 +94,11 @@ public class ClusterApiServlet extends AbstractServlet {
     }
 
     protected ObjectNode getClusterStateSummaryJSON() {
-        return ClusterStateManager.INSTANCE.getClusterStateSummary();
+        return appCtx.getClusterStateManager().getClusterStateSummary();
     }
 
     protected ObjectNode getClusterStateJSON(IServletRequest request, String pathToNode) {
-        ObjectNode json = ClusterStateManager.INSTANCE.getClusterStateDescription();
+        ObjectNode json = appCtx.getClusterStateManager().getClusterStateDescription();
         CcApplicationContext appConfig = (CcApplicationContext) ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR);
         json.putPOJO("config", ConfigUtils.getSectionOptionsForJSON(appConfig.getServiceContext().getAppConfig(),
                 Section.COMMON, getConfigSelector()));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
index 6dea30c..15f28a4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -41,8 +42,9 @@ public class ClusterControllerDetailsApiServlet extends ClusterApiServlet {
     private static final Logger LOGGER = Logger.getLogger(ClusterControllerDetailsApiServlet.class.getName());
     private final ObjectMapper om = new ObjectMapper();
 
-    public ClusterControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx, String... paths) {
-        super(ctx, paths);
+    public ClusterControllerDetailsApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx,
+            String... paths) {
+        super(appCtx, ctx, paths);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index b64b6f6..a631db0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -34,7 +34,8 @@ import java.util.concurrent.Future;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -55,8 +56,8 @@ public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
     protected final IHyracksClientConnection hcc;
     protected final ExecutorService executor;
 
-    public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
-        super(ctx, paths);
+    public DiagnosticsApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) {
+        super(appCtx, ctx, paths);
         this.om = new ObjectMapper();
         this.hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
         this.executor = (ExecutorService) ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
@@ -66,7 +67,6 @@ public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
     protected void get(IServletRequest request, IServletResponse response) throws IOException {
         HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
         PrintWriter responseWriter = response.writer();
-        ObjectNode json;
         response.setStatus(HttpResponseStatus.OK);
         om.enable(SerializationFeature.INDENT_OUTPUT);
         try {
@@ -89,9 +89,9 @@ public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
     protected ObjectNode getClusterDiagnosticsJSON() throws Exception {
         Map<String, Future<JsonNode>> ccFutureData;
         ccFutureData = getCcDiagosticsFutures();
-
+        IClusterStateManager csm = appCtx.getClusterStateManager();
         Map<String, Map<String, Future<JsonNode>>> ncDataMap = new HashMap<>();
-        for (String nc : ClusterStateManager.INSTANCE.getParticipantNodes()) {
+        for (String nc : csm.getParticipantNodes()) {
             ncDataMap.put(nc, getNcDiagnosticFutures(nc));
         }
         ObjectNode result = om.createObjectNode();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index 01c59f3..ee4812a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -29,7 +29,9 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -48,8 +50,9 @@ public class NodeControllerDetailsApiServlet extends ClusterApiServlet {
     private static final Logger LOGGER = Logger.getLogger(NodeControllerDetailsApiServlet.class.getName());
     private final ObjectMapper om = new ObjectMapper();
 
-    public NodeControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx, String... paths) {
-        super(ctx, paths);
+    public NodeControllerDetailsApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx,
+            String... paths) {
+        super(appCtx, ctx, paths);
         om.enable(SerializationFeature.INDENT_OUTPUT);
     }
 
@@ -205,8 +208,9 @@ public class NodeControllerDetailsApiServlet extends ClusterApiServlet {
         String dump = hcc.getThreadDump(node);
         if (dump == null) {
             // check to see if this is a node that is simply down
-            throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null ? new IllegalStateException()
-                    : new IllegalArgumentException();
+            IClusterStateManager csm = appCtx.getClusterStateManager();
+            ClusterPartition[] cp = csm.getNodePartitions(node);
+            throw cp != null ? new IllegalStateException() : new IllegalArgumentException();
         }
         return (ObjectNode) om.readTree(dump);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index fa67190..a35d191 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -39,7 +39,6 @@ import org.apache.asterix.lang.aql.parser.TokenMgrError;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -428,7 +427,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
     protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery,
             IStatementExecutor.Stats stats, RequestParameters param, String handleUrl, long[] outExecStartEnd)
             throws Exception {
-        IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+        IClusterManagementWork.ClusterState clusterState =
+                ((ICcApplicationContext) appCtx).getClusterStateManager().getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
             // using a plain IllegalStateException here to get into the right catch clause for a 500
             throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index 38f6691..0fe6863 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -28,8 +28,9 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -49,14 +50,17 @@ public class ShutdownApiServlet extends AbstractServlet {
     public static final String NCSERVICE_PID = "ncservice_pid";
     public static final String INI = "ini";
     public static final String PID = "pid";
+    private final ICcApplicationContext appCtx;
 
-    public ShutdownApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+    public ShutdownApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) {
         super(ctx, paths);
+        this.appCtx = appCtx;
     }
 
     @Override
     protected void post(IServletRequest request, IServletResponse response) {
         IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
         boolean terminateNCServices = "true".equalsIgnoreCase(request.getParameter("all"));
         Thread t = new Thread(() -> {
             try {
@@ -79,7 +83,7 @@ public class ShutdownApiServlet extends AbstractServlet {
         try {
             jsonObject.put("status", "SHUTTING_DOWN");
             jsonObject.put("date", new Date().toString());
-            ObjectNode clusterState = ClusterStateManager.INSTANCE.getClusterStateDescription();
+            ObjectNode clusterState = csm.getClusterStateDescription();
             ArrayNode ncs = (ArrayNode) clusterState.get("ncs");
             for (int i = 0; i < ncs.size(); i++) {
                 ObjectNode nc = (ObjectNode) ncs.get(i);
@@ -94,7 +98,7 @@ public class ShutdownApiServlet extends AbstractServlet {
             final PrintWriter writer = response.writer();
             writer.print(JSONUtil.convertNode(jsonObject));
             // accept no further queries once this servlet returns
-            ClusterStateManager.INSTANCE.setState(SHUTTING_DOWN);
+            csm.setState(SHUTTING_DOWN);
             writer.close();
         } catch (Exception e) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, "Exception writing response", e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
deleted file mode 100644
index 372404c..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.cc;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
-
-public class ResourceIdManager implements IResourceIdManager {
-
-    private final AtomicLong globalResourceId = new AtomicLong();
-    private volatile Set<String> reportedNodes = new HashSet<>();
-    private volatile boolean allReported = false;
-
-    @Override
-    public long createResourceId() {
-        if (!allReported) {
-            synchronized (this) {
-                if (!allReported) {
-                    if (reportedNodes.size() < ClusterStateManager.INSTANCE.getNumberOfNodes()) {
-                        return -1;
-                    } else {
-                        reportedNodes = null;
-                        allReported = true;
-                    }
-                }
-            }
-        }
-        return globalResourceId.incrementAndGet();
-    }
-
-    @Override
-    public synchronized boolean reported(String nodeId) {
-        return allReported || reportedNodes.contains(nodeId);
-    }
-
-    @Override
-    public synchronized void report(String nodeId, long maxResourceId) {
-        if (!allReported) {
-            globalResourceId.set(Math.max(maxResourceId, globalResourceId.get()));
-            reportedNodes.add(nodeId);
-            if (reportedNodes.size() == ClusterStateManager.INSTANCE.getNumberOfNodes()) {
-                reportedNodes = null;
-                allReported = true;
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 5cee3d9..62dcede 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -30,6 +30,7 @@ import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -41,7 +42,6 @@ import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -139,7 +139,9 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
         if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) {
             return "Node is not registerted with the CC";
         }
-        final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+        ICcApplicationContext appCtx = (ICcApplicationContext) ccSrv.getApplicationContext();
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        final IClusterManagementWork.ClusterState clusterState = csm.getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
             return "Cannot execute request, cluster is " + clusterState;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7647881..3591bf0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -131,7 +131,6 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     private IReplicationManager replicationManager;
     private IRemoteRecoveryManager remoteRecoveryManager;
     private IReplicaResourcesManager replicaResourcesManager;
-
     private final ILibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 7ed5171..365e4a3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -56,6 +56,7 @@ import org.apache.asterix.app.active.FeedEventsListener;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -152,7 +153,6 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
@@ -707,7 +707,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
     protected static String configureNodegroupForDataset(ICcApplicationContext appCtx, Map<String, String> hints,
             String dataverseName, String datasetName, MetadataProvider metadataProvider) throws Exception {
-        Set<String> allNodes = ClusterStateManager.INSTANCE.getParticipantNodes(true);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        Set<String> allNodes = csm.getParticipantNodes(true);
         Set<String> selectedNodes = new LinkedHashSet<>();
         String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME);
         if (hintValue == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6d817b8..37e0c58 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -54,7 +54,6 @@ import org.apache.asterix.api.http.server.UpdateApiServlet;
 import org.apache.asterix.api.http.server.VersionApiServlet;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
-import org.apache.asterix.app.cc.ResourceIdManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
 import org.apache.asterix.common.api.AsterixThreadFactory;
@@ -79,7 +78,6 @@ import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -130,7 +128,7 @@ public class CCApplication extends BaseCCApplication {
         int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
         hcc = new HyracksConnection(strIP, port);
         ILibraryManager libraryManager = new ExternalLibraryManager();
-        ResourceIdManager resourceIdManager = new ResourceIdManager();
+
         IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
         IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory
                 .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, ccServiceCtx);
@@ -138,10 +136,9 @@ public class CCApplication extends BaseCCApplication {
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
         statementExecutorCtx = new StatementExecutorContext();
-        appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager,
-                () -> MetadataManager.INSTANCE, globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(),
-                componentProvider, new MetadataLockManager());
-        ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
+        appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
+                globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(), componentProvider,
+                new MetadataLockManager());
         ccExtensionManager = new CCExtensionManager(getExtensions());
         appCtx.setExtensionManager(ccExtensionManager);
         final CCConfig ccConfig = controllerService.getCCConfig();
@@ -191,7 +188,7 @@ public class CCApplication extends BaseCCApplication {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Stopping Asterix cluster controller");
         }
-        ClusterStateManager.INSTANCE.setState(SHUTTING_DOWN);
+        appCtx.getClusterStateManager().setState(SHUTTING_DOWN);
         if (appCtx != null) {
             ((ActiveNotificationHandler) appCtx.getActiveNotificationHandler()).stop();
         }
@@ -305,17 +302,17 @@ public class CCApplication extends BaseCCApplication {
             case Servlets.REBALANCE:
                 return new RebalanceApiServlet(ctx, paths, appCtx);
             case Servlets.SHUTDOWN:
-                return new ShutdownApiServlet(ctx, paths);
+                return new ShutdownApiServlet(appCtx, ctx, paths);
             case Servlets.VERSION:
                 return new VersionApiServlet(ctx, paths);
             case Servlets.CLUSTER_STATE:
-                return new ClusterApiServlet(ctx, paths);
+                return new ClusterApiServlet(appCtx, ctx, paths);
             case Servlets.CLUSTER_STATE_NODE_DETAIL:
-                return new NodeControllerDetailsApiServlet(ctx, paths);
+                return new NodeControllerDetailsApiServlet(appCtx, ctx, paths);
             case Servlets.CLUSTER_STATE_CC_DETAIL:
-                return new ClusterControllerDetailsApiServlet(ctx, paths);
+                return new ClusterControllerDetailsApiServlet(appCtx, ctx, paths);
             case Servlets.DIAGNOSTICS:
-                return new DiagnosticsApiServlet(ctx, paths);
+                return new DiagnosticsApiServlet(appCtx, ctx, paths);
             case Servlets.ACTIVE_STATS:
                 return new ActiveStatsApiServlet(ctx, paths, appCtx);
             default:
@@ -334,7 +331,7 @@ public class CCApplication extends BaseCCApplication {
     @Override
     public void startupCompleted() throws Exception {
         ccServiceCtx.getControllerService().getExecutor().submit(() -> {
-            ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+            appCtx.getClusterStateManager().waitForState(ClusterState.ACTIVE);
             ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
             return null;
         });

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 66f76c5..0583508 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -32,6 +32,7 @@ import org.apache.asterix.common.api.IClusterEventsSubscriber;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -42,7 +43,6 @@ import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
 import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -70,10 +70,11 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("NC: " + nodeId + " joined");
         }
-        ClusterStateManager.INSTANCE.addNCConfiguration(nodeId, ncConfiguration);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        csm.addNCConfiguration(nodeId, ncConfiguration);
 
         //if metadata node rejoining, we need to rebind the proxy connection when it is active again.
-        if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+        if (!csm.isMetadataNodeActive()) {
             MetadataManager.INSTANCE.rebindMetadataNode();
         }
 
@@ -99,10 +100,11 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
             }
-            ClusterStateManager.INSTANCE.removeNCConfiguration(deadNode);
+            IClusterStateManager csm = appCtx.getClusterStateManager();
+            csm.removeNCConfiguration(deadNode);
 
             //if metadata node failed, we need to rebind the proxy connection when it is active again
-            if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+            if (!csm.isMetadataNodeActive()) {
                 MetadataManager.INSTANCE.rebindMetadataNode();
             }
         }
@@ -171,8 +173,9 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
 
         List<String> addedNodes = new ArrayList<>();
         String asterixInstanceName = ClusterProperties.INSTANCE.getCluster().getInstanceName();
+        IClusterStateManager csm = appCtx.getClusterStateManager();
         for (int i = 0; i < nodesToAdd; i++) {
-            Node node = ClusterStateManager.INSTANCE.getAvailableSubstitutionNode();
+            Node node = csm.getAvailableSubstitutionNode();
             if (node != null) {
                 try {
                     ClusterManagerProvider.getClusterManager().addNode(appCtx, node);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 46968b4..2977a58 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -25,13 +25,13 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.cluster.AddNodeWork;
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 
 public class ClusterWorkExecutor implements Runnable {
 
@@ -69,9 +69,10 @@ public class ClusterWorkExecutor implements Runnable {
                     }
                 }
 
+                IClusterStateManager csm = appCtx.getClusterStateManager();
                 Set<Node> addedNodes = new HashSet<>();
                 for (int i = 0; i < nodesToAdd; i++) {
-                    Node node = ClusterStateManager.INSTANCE.getAvailableSubstitutionNode();
+                    Node node = csm.getAvailableSubstitutionNode();
                     if (node != null) {
                         try {
                             ClusterManagerProvider.getClusterManager().addNode(appCtx, node);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 9fc9940..cc95770 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -33,6 +33,7 @@ import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
@@ -66,7 +67,6 @@ import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.feeds.LocationConstraint;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.translator.CompiledStatements;
 import org.apache.asterix.translator.IStatementExecutor;
@@ -134,8 +134,10 @@ public class FeedOperations {
 
     public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider metadataProvider, Feed feed)
             throws AsterixException {
-        JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations();
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+        JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations();
         Set<String> nodes = new TreeSet<>();
         for (String node : allCluster.getLocations()) {
             nodes.add(node);
@@ -143,7 +145,7 @@ public class FeedOperations {
         AlgebricksAbsolutePartitionConstraint locations =
                 new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()]));
         FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations);
+                FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
         org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
                 StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
         FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, spC.first, true);
@@ -273,9 +275,8 @@ public class FeedOperations {
             }
 
             // make connections between operators
-            for (Entry<ConnectorDescriptorId,
-                       Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry
-                       : subJob.getConnectorOperatorMap().entrySet()) {
+            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>,
+                    Pair<IOperatorDescriptor, Integer>>> entry : subJob.getConnectorOperatorMap().entrySet()) {
                 ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey());
                 IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId);
                 Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index c1421c5..53a4f23 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -405,7 +405,10 @@ public class TestNodeController {
                     IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, false, true,
                     MetadataUtil.PENDING_NO_OP);
             List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
-            FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(dataset, index.getIndexName(), nodes);
+            FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(
+                    ((ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext())
+                            .getClusterStateManager(),
+                    dataset, index.getIndexName(), nodes);
             fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 0aea84d..e1840d3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -59,5 +59,4 @@ public interface IApplicationContext {
     public ILibraryManager getLibraryManager();
 
     IServiceContext getServiceContext();
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 30675cd..b368c3b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -19,12 +19,19 @@
 package org.apache.asterix.common.cluster;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public interface IClusterStateManager {
 
@@ -49,6 +56,7 @@ public interface IClusterStateManager {
 
     /**
      * Updates all partitions of {@code nodeId} based on the {@code active} flag.
+     *
      * @param nodeId
      * @param active
      * @throws HyracksDataException
@@ -93,6 +101,7 @@ public interface IClusterStateManager {
 
     /**
      * Blocks until the cluster state becomes {@code state}, or timeout is exhausted.
+     *
      * @return true if the desired state was reached before timeout occurred
      */
     boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
@@ -116,4 +125,106 @@ public interface IClusterStateManager {
      * @throws HyracksDataException
      */
     void deregisterNodePartitions(String nodeId) throws HyracksDataException;
+
+    /**
+     * @return true if cluster is active, false otherwise
+     */
+    boolean isClusterActive();
+
+    /**
+     * @return the set of participant nodes
+     */
+    Set<String> getParticipantNodes();
+
+    /**
+     * Returns the IO devices configured for a Node Controller
+     *
+     * @param nodeId
+     *            unique identifier of the Node Controller
+     * @return a list of IO devices.
+     */
+    String[] getIODevices(String nodeId);
+
+    /**
+     * @return the constraint representing all the partitions of the cluster
+     */
+    AlgebricksAbsolutePartitionConstraint getClusterLocations();
+
+    /**
+     * @param excludePendingRemoval
+     *            true, if the desired set shouldn't have pending removal nodes
+     * @return the set of participant nodes
+     */
+    Set<String> getParticipantNodes(boolean excludePendingRemoval);
+
+    /**
+     * @param node
+     *            the node id
+     * @return the number of partitions on that node
+     */
+    int getNodePartitionsCount(String node);
+
+    /**
+     * @return a json object representing the cluster state summary
+     */
+    ObjectNode getClusterStateSummary();
+
+    /**
+     * @return a json object representing the cluster state description
+     */
+    ObjectNode getClusterStateDescription();
+
+    /**
+     * Set the cc application context
+     *
+     * @param appCtx
+     */
+    void setCcAppCtx(ICcApplicationContext appCtx);
+
+    /**
+     * @return the number of cluster nodes
+     */
+    int getNumberOfNodes();
+
+    /**
+     * Add node configuration
+     *
+     * @param nodeId
+     * @param ncConfiguration
+     * @throws HyracksException
+     */
+    void addNCConfiguration(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException;
+
+    /**
+     * @return true if metadata node is active, false otherwise
+     */
+    boolean isMetadataNodeActive();
+
+    /**
+     * Remove configuration of a dead node
+     *
+     * @param deadNode
+     * @throws HyracksException
+     */
+    void removeNCConfiguration(String deadNode) throws HyracksException;
+
+    /**
+     * @return a substitution node or null
+     */
+    Node getAvailableSubstitutionNode();
+
+    /**
+     * Add node to the list of nodes pending removal
+     *
+     * @param nodeId
+     */
+    void removePending(String nodeId);
+
+    /**
+     * Deregister intention to remove node id
+     *
+     * @param nodeId
+     * @return
+     */
+    boolean cancelRemovePending(String nodeId);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3eff214..20f685a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -23,6 +23,7 @@ import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -99,6 +100,11 @@ public interface ICcApplicationContext extends IApplicationContext {
     IMetadataLockManager getMetadataLockManager();
 
     /**
+     * @return the metadata bootstrap
+     */
+    IMetadataBootstrap getMetadataBootstrap();
+
+    /**
      * @return the cluster state manager
      */
     IClusterStateManager getClusterStateManager();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 2eb81d4..0a47788 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -25,6 +25,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataFlowController;
@@ -92,8 +93,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
-        INcApplicationContext appCtx =
-                (INcApplicationContext) serviceCtx.getApplicationContext();
+        INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
         try {
             restoreExternalObjects(serviceCtx, appCtx.getLibraryManager());
         } catch (Exception e) {
@@ -152,15 +152,16 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
         dataParserFactory.setMetaType(metaType);
         dataParserFactory.configure(configuration);
         ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
-        configureFeedLogManager();
+        configureFeedLogManager(appCtx);
         nullifyExternalObjects();
     }
 
-    private void configureFeedLogManager() throws HyracksDataException, AlgebricksException {
+    private void configureFeedLogManager(IApplicationContext appCtx) throws HyracksDataException, AlgebricksException {
         this.isFeed = ExternalDataUtils.isFeed(configuration);
         if (isFeed) {
-            feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
-                    ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
+            feedLogFileSplits = FeedUtils.splitsForAdapter((ICcApplicationContext) appCtx,
+                    ExternalDataUtils.getDataverse(configuration), ExternalDataUtils.getFeedName(configuration),
+                    dataSourceFactory.getPartitionConstraint());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index edda448..4c3b7e6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -84,9 +84,10 @@ public interface IExternalDataSourceFactory extends Serializable {
      * @return
      * @throws AlgebricksException
      */
-    public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
+    public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(ICcApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint constraints, int count) throws AlgebricksException {
         if (constraints == null) {
+            IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
             ArrayList<String> locs = new ArrayList<>();
             Set<String> stores = appCtx.getMetadataProperties().getStores().keySet();
             if (stores.isEmpty()) {
@@ -97,7 +98,7 @@ public interface IExternalDataSourceFactory extends Serializable {
                 Iterator<String> storeIt = stores.iterator();
                 while (storeIt.hasNext()) {
                     String node = storeIt.next();
-                    int numIODevices = ClusterStateManager.INSTANCE.getIODevices(node).length;
+                    int numIODevices = clusterStateManager.getIODevices(node).length;
                     for (int k = 0; k < numIODevices; k++) {
                         locs.add(node);
                         i++;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index af4be91..6291ba1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -55,7 +55,7 @@ public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntry> {
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
         int count = urls.size();
         clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
-                (IApplicationContext) serviceContext.getApplicationContext(), clusterLocations, count);
+                (ICcApplicationContext) serviceContext.getApplicationContext(), clusterLocations, count);
         return clusterLocations;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index a0b53fd..3fb1d5f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
@@ -55,20 +55,16 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<String>
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     private transient IServiceContext serviceCtx;
 
-    private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList(
-            ExternalDataConstants.READER_TWITTER_PULL,
-            ExternalDataConstants.READER_TWITTER_PUSH,
-            ExternalDataConstants.READER_PUSH_TWITTER,
-            ExternalDataConstants.READER_PULL_TWITTER,
-            ExternalDataConstants.READER_USER_STREAM_TWITTER));
-
+    private static final List<String> recordReaderNames =
+            Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.READER_TWITTER_PULL,
+                    ExternalDataConstants.READER_TWITTER_PUSH, ExternalDataConstants.READER_PUSH_TWITTER,
+                    ExternalDataConstants.READER_PULL_TWITTER, ExternalDataConstants.READER_USER_STREAM_TWITTER));
 
     @Override
     public DataSourceType getDataSourceType() {
         return DataSourceType.RECORDS;
     }
 
-
     @Override
     public List<String> getRecordReaderNames() {
         return recordReaderNames;
@@ -77,8 +73,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<String>
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
         clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
-                (IApplicationContext) serviceCtx.getApplicationContext(),
-                clusterLocations, INTAKE_CARDINALITY);
+                (ICcApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, INTAKE_CARDINALITY);
         return clusterLocations;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index caeaa07..bfc2cb7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
@@ -50,7 +50,7 @@ public class SocketClientInputStreamFactory implements IInputStreamFactory {
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
         clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
-                (IApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, sockets.size());
+                (ICcApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, sockets.size());
         return clusterLocations;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 12be449..c7b8633 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -23,10 +23,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -54,6 +54,7 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
     private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
 
     private Map<String, String> configuration;
+    private transient IServiceContext serviceCtx;
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -67,10 +68,10 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
         if (ingestionCardinalityParam != null) {
             count = Integer.parseInt(ingestionCardinalityParam);
         }
-
+        ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
         List<String> chosenLocations = new ArrayList<>();
         String[] availableLocations = locations != null ? locations
-                : ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {});
+                : appCtx.getClusterStateManager().getParticipantNodes().toArray(new String[] {});
         for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
             chosenLocations.add(availableLocations[k]);
         }
@@ -84,6 +85,7 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
 
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration) {
+        this.serviceCtx = serviceCtx;
         this.configuration = configuration;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index ec7de91..dad0d51 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -26,9 +26,9 @@ import java.util.Map;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
@@ -63,9 +63,9 @@ public class FeedUtils {
     }
 
     public enum Mode {
-        PROCESS,            // There is memory
-        SPILL,              // Memory budget has been consumed. Now we're writing to disk
-        DISCARD             // Memory and Disk space budgets have been consumed. Now we're discarding
+        PROCESS, // There is memory
+        SPILL, // Memory budget has been consumed. Now we're writing to disk
+        DISCARD // Memory and Disk space budgets have been consumed. Now we're discarding
     }
 
     private FeedUtils() {
@@ -87,7 +87,7 @@ public class FeedUtils {
         return StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath());
     }
 
-    public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
+    public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx, String dataverseName, String feedName,
             AlgebricksPartitionConstraint partitionConstraints) throws AsterixException {
         if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
             throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
@@ -96,7 +96,7 @@ public class FeedUtils {
         List<FileSplit> splits = new ArrayList<>();
         for (String nd : locations) {
             splits.add(splitsForAdapter(dataverseName, feedName, nd,
-                    ClusterStateManager.INSTANCE.getNodePartitions(nd)[0]));
+                    appCtx.getClusterStateManager().getNodePartitions(nd)[0]));
         }
         return splits.toArray(new FileSplit[] {});
     }
@@ -113,8 +113,8 @@ public class FeedUtils {
 
     public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit)
             throws HyracksDataException {
-        return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(),
-                0, ctx.getIoManager()).getFile());
+        return new FeedLogManager(
+                FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile());
     }
 
     public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index b4353e7..bd50352 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -25,7 +25,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -33,7 +35,6 @@ import org.apache.asterix.external.indexing.IndexingScheduler;
 import org.apache.asterix.external.indexing.RecordId.RecordIdType;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.hivecompat.io.RCFileInputFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -207,10 +208,12 @@ public class HDFSUtils {
     public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
+            IClusterStateManager clusterStateManager = ((ICcApplicationContext) appCtx).getClusterStateManager();
             ArrayList<String> locs = new ArrayList<>();
             Map<String, String[]> stores = appCtx.getMetadataProperties().getStores();
             for (String node : stores.keySet()) {
-                int numIODevices = ClusterStateManager.INSTANCE.getIODevices(node).length;
+
+                int numIODevices = clusterStateManager.getIODevices(node).length;
                 for (int k = 0; k < numIODevices; k++) {
                     locs.add(node);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index c9e37ee..715ad02 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -40,11 +40,10 @@ public class RecordWithPKTestReaderFactory implements IRecordReaderFactory<Recor
     private transient IServiceContext serviceCtx;
     private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList());
 
-
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
-        clusterLocations = IExternalDataSourceFactory
-                .getPartitionConstraints((IApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, 1);
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
+                (ICcApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, 1);
         return clusterLocations;
     }
 
@@ -64,7 +63,8 @@ public class RecordWithPKTestReaderFactory implements IRecordReaderFactory<Recor
         return RecordWithPK.class;
     }
 
-    @Override public List<String> getRecordReaderNames() {
+    @Override
+    public List<String> getRecordReaderNames() {
         return recordReaderNames;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index c45941d..c09b9eb 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -23,9 +23,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -43,17 +43,23 @@ public class KVTestReaderFactory implements IRecordReaderFactory<DCPRequest> {
     private int upsertCycle = 0;
     private int numOfReaders;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private transient IServiceContext serviceCtx;
     private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList());
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        clusterLocations = ClusterStateManager.INSTANCE.getClusterLocations();
-        numOfReaders = clusterLocations.getLocations().length;
+        if (clusterLocations == null) {
+            ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+            clusterLocations = appCtx.getClusterStateManager().getClusterLocations();
+            numOfReaders = clusterLocations.getLocations().length;
+        }
         return clusterLocations;
+
     }
 
     @Override
     public void configure(IServiceContext serviceCtx, final Map<String, String> configuration) {
+        this.serviceCtx = serviceCtx;
         if (configuration.containsKey("num-of-records")) {
             numOfRecords = Integer.parseInt(configuration.get("num-of-records"));
         }
@@ -83,7 +89,8 @@ public class KVTestReaderFactory implements IRecordReaderFactory<DCPRequest> {
         return DCPRequest.class;
     }
 
-    @Override public List<String> getRecordReaderNames() {
+    @Override
+    public List<String> getRecordReaderNames() {
         return recordReaderNames;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 5262e1f..616ed6e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
@@ -65,7 +66,7 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
         clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
-                (IApplicationContext) serviceContext.getApplicationContext(), clusterLocations, 1);
+                (ICcApplicationContext) serviceContext.getApplicationContext(), clusterLocations, 1);
         return clusterLocations;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
index 709f655..f5cfbb3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
 /**
@@ -113,7 +112,7 @@ public class DatasetHints {
                 if (intValue < 0) {
                     return new Pair<>(false, "Value must be >= 0");
                 }
-                int numNodesInCluster = ClusterStateManager.INSTANCE.getParticipantNodes(true).size();
+                int numNodesInCluster = appCtx.getClusterStateManager().getParticipantNodes(true).size();
                 if (numNodesInCluster < intValue) {
                     return new Pair<>(false,
                             "Value must be less than or equal to the available number of nodes in cluster ("


Mime
View raw message