Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0B60D200CFD for ; Wed, 6 Sep 2017 18:52:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 09DFD1609BB; Wed, 6 Sep 2017 16:52:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B781E1609D1 for ; Wed, 6 Sep 2017 18:52:35 +0200 (CEST) Received: (qmail 11649 invoked by uid 500); 6 Sep 2017 16:52:34 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 11585 invoked by uid 99); 6 Sep 2017 16:52:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Sep 2017 16:52:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F014CF552A; Wed, 6 Sep 2017 16:52:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Date: Wed, 06 Sep 2017 16:52:33 -0000 Message-Id: <16b48b0a20594d408b5194d5aa43348f@git.apache.org> In-Reply-To: <3a43e738f6034526af5adcc498571f12@git.apache.org> References: <3a43e738f6034526af5adcc498571f12@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] asterixdb git commit: [NO ISSUE][*DB] Explicitly create ClusterStateManager archived-at: Wed, 06 Sep 2017 16:52:38 -0000 [NO ISSUE][*DB] Explicitly create ClusterStateManager - user model changes: no - storage format changes: no - interface changes: no details: - Previously, we create the cluster state manager using the singleton pattern. - After this change, cluster state manager is created as part of the cc application start. - To access the cluster state manager after this change, the cc application context is used. Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1944 Tested-by: Jenkins Contrib: Jenkins Integration-Tests: Jenkins Reviewed-by: Michael Blow Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/42441374 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/42441374 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/42441374 Branch: refs/heads/master Commit: 424413743d6211f19c7f9b0ecc30b5dcafe823d9 Parents: 9b9dc22 Author: Abdullah Alamoudi Authored: Fri Sep 1 10:44:12 2017 -0700 Committer: abdullah alamoudi Committed: Wed Sep 6 09:52:17 2017 -0700 ---------------------------------------------------------------------- .../optimizer/rules/UnnestToDataScanRule.java | 8 +- .../translator/AbstractLangTranslator.java | 3 +- .../common/AsterixHyracksIntegrationUtil.java | 4 +- .../api/http/server/ClusterApiServlet.java | 10 +- .../ClusterControllerDetailsApiServlet.java | 6 +- .../api/http/server/DiagnosticsApiServlet.java | 12 +- .../server/NodeControllerDetailsApiServlet.java | 14 ++- .../api/http/server/QueryServiceServlet.java | 4 +- .../api/http/server/ShutdownApiServlet.java | 12 +- .../asterix/app/cc/ResourceIdManager.java | 68 ------------ .../message/ExecuteStatementRequestMessage.java | 6 +- .../asterix/app/nc/NCAppRuntimeContext.java | 1 - .../asterix/app/translator/QueryTranslator.java | 5 +- .../hyracks/bootstrap/CCApplication.java | 25 ++--- .../bootstrap/ClusterLifecycleListener.java | 15 ++- .../hyracks/bootstrap/ClusterWorkExecutor.java | 5 +- .../apache/asterix/utils/FeedOperations.java | 15 +-- .../app/bootstrap/TestNodeController.java | 5 +- .../asterix/common/api/IApplicationContext.java | 1 - .../common/cluster/IClusterStateManager.java | 111 +++++++++++++++++++ .../common/dataflow/ICcApplicationContext.java | 6 + .../adapter/factory/GenericAdapterFactory.java | 13 ++- .../api/IExternalDataSourceFactory.java | 9 +- .../reader/rss/RSSRecordReaderFactory.java | 4 +- .../twitter/TwitterRecordReaderFactory.java | 17 +-- .../factory/SocketClientInputStreamFactory.java | 4 +- .../factory/TwitterFirehoseStreamFactory.java | 8 +- .../apache/asterix/external/util/FeedUtils.java | 16 +-- .../apache/asterix/external/util/HDFSUtils.java | 7 +- .../reader/RecordWithPKTestReaderFactory.java | 10 +- .../record/reader/kv/KVTestReaderFactory.java | 15 ++- .../adapter/TestTypedAdapterFactory.java | 3 +- .../metadata/dataset/hints/DatasetHints.java | 3 +- .../metadata/declared/FeedDataSource.java | 31 +++--- .../metadata/declared/MetadataManagerUtil.java | 20 ++-- .../metadata/declared/MetadataProvider.java | 18 +-- .../utils/SplitsAndConstraintsUtil.java | 23 ++-- .../message/ResourceIdRequestMessage.java | 12 +- .../runtime/transaction/ResourceIdManager.java | 73 ++++++++++++ .../runtime/utils/CcApplicationContext.java | 18 +-- .../runtime/utils/ClusterStateManager.java | 35 +++--- 41 files changed, 420 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java index 7f35d08..4550ba6 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java @@ -260,10 +260,10 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule { keyAccessExpression = null; keyAccessScalarFunctionCallExpression = null; } - FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, targetDataset, feedOutputType, metaType, - pkTypes, partitioningKeys, keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(), - FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(), - feedConnection); + FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed, + aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression, + sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), + context.getComputationNodeDomain(), feedConnection); feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy); return feedDataSource; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java index 1d47095..c3f01e8 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java @@ -38,7 +38,6 @@ import org.apache.asterix.lang.common.statement.InsertStatement; import org.apache.asterix.metadata.dataset.hints.DatasetHints; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.utils.MetadataConstants; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -53,7 +52,7 @@ public abstract class AbstractLangTranslator { public void validateOperation(ICcApplicationContext appCtx, Dataverse defaultDataverse, Statement stmt) throws AsterixException { - final IClusterStateManager clusterStateManager = ClusterStateManager.INSTANCE; + final IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); final IGlobalRecoveryManager globalRecoveryManager = appCtx.getGlobalRecoveryManager(); if (!(clusterStateManager.getState().equals(ClusterState.ACTIVE) && globalRecoveryManager.isRecoveryCompleted())) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 2799765..71c67f4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -32,10 +32,10 @@ import java.util.logging.Logger; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.PropertiesAccessor; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.hyracks.bootstrap.NCApplication; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.ICCApplication; import org.apache.hyracks.api.application.INCApplication; @@ -116,7 +116,7 @@ public class AsterixHyracksIntegrationUtil { thread.join(); } // Wait until cluster becomes active - ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE); + ((ICcApplicationContext) cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE); hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort()); this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java index 4faab1e..3e1a16d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java @@ -27,8 +27,8 @@ import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.Section; import org.apache.hyracks.control.common.config.ConfigUtils; @@ -58,9 +58,11 @@ public class ClusterApiServlet extends AbstractServlet { protected static final String VERSION_URI_KEY = "versionUri"; protected static final String DIAGNOSTICS_URI_KEY = "diagnosticsUri"; private final ObjectMapper om = new ObjectMapper(); + protected final ICcApplicationContext appCtx; - public ClusterApiServlet(ConcurrentMap ctx, String... paths) { + public ClusterApiServlet(ICcApplicationContext appCtx, ConcurrentMap ctx, String... paths) { super(ctx, paths); + this.appCtx = appCtx; } @Override @@ -92,11 +94,11 @@ public class ClusterApiServlet extends AbstractServlet { } protected ObjectNode getClusterStateSummaryJSON() { - return ClusterStateManager.INSTANCE.getClusterStateSummary(); + return appCtx.getClusterStateManager().getClusterStateSummary(); } protected ObjectNode getClusterStateJSON(IServletRequest request, String pathToNode) { - ObjectNode json = ClusterStateManager.INSTANCE.getClusterStateDescription(); + ObjectNode json = appCtx.getClusterStateManager().getClusterStateDescription(); CcApplicationContext appConfig = (CcApplicationContext) ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR); json.putPOJO("config", ConfigUtils.getSectionOptionsForJSON(appConfig.getServiceContext().getAppConfig(), Section.COMMON, getConfigSelector())); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java index 6dea30c..15f28a4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -41,8 +42,9 @@ public class ClusterControllerDetailsApiServlet extends ClusterApiServlet { private static final Logger LOGGER = Logger.getLogger(ClusterControllerDetailsApiServlet.class.getName()); private final ObjectMapper om = new ObjectMapper(); - public ClusterControllerDetailsApiServlet(ConcurrentMap ctx, String... paths) { - super(ctx, paths); + public ClusterControllerDetailsApiServlet(ICcApplicationContext appCtx, ConcurrentMap ctx, + String... paths) { + super(appCtx, ctx, paths); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java index b64b6f6..a631db0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java @@ -34,7 +34,8 @@ import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -55,8 +56,8 @@ public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet { protected final IHyracksClientConnection hcc; protected final ExecutorService executor; - public DiagnosticsApiServlet(ConcurrentMap ctx, String[] paths) { - super(ctx, paths); + public DiagnosticsApiServlet(ICcApplicationContext appCtx, ConcurrentMap ctx, String... paths) { + super(appCtx, ctx, paths); this.om = new ObjectMapper(); this.hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); this.executor = (ExecutorService) ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR); @@ -66,7 +67,6 @@ public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet { protected void get(IServletRequest request, IServletResponse response) throws IOException { HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); PrintWriter responseWriter = response.writer(); - ObjectNode json; response.setStatus(HttpResponseStatus.OK); om.enable(SerializationFeature.INDENT_OUTPUT); try { @@ -89,9 +89,9 @@ public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet { protected ObjectNode getClusterDiagnosticsJSON() throws Exception { Map> ccFutureData; ccFutureData = getCcDiagosticsFutures(); - + IClusterStateManager csm = appCtx.getClusterStateManager(); Map>> ncDataMap = new HashMap<>(); - for (String nc : ClusterStateManager.INSTANCE.getParticipantNodes()) { + for (String nc : csm.getParticipantNodes()) { ncDataMap.put(nc, getNcDiagnosticFutures(nc)); } ObjectNode result = om.createObjectNode(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java index 01c59f3..ee4812a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java @@ -29,7 +29,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -48,8 +50,9 @@ public class NodeControllerDetailsApiServlet extends ClusterApiServlet { private static final Logger LOGGER = Logger.getLogger(NodeControllerDetailsApiServlet.class.getName()); private final ObjectMapper om = new ObjectMapper(); - public NodeControllerDetailsApiServlet(ConcurrentMap ctx, String... paths) { - super(ctx, paths); + public NodeControllerDetailsApiServlet(ICcApplicationContext appCtx, ConcurrentMap ctx, + String... paths) { + super(appCtx, ctx, paths); om.enable(SerializationFeature.INDENT_OUTPUT); } @@ -205,8 +208,9 @@ public class NodeControllerDetailsApiServlet extends ClusterApiServlet { String dump = hcc.getThreadDump(node); if (dump == null) { // check to see if this is a node that is simply down - throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null ? new IllegalStateException() - : new IllegalArgumentException(); + IClusterStateManager csm = appCtx.getClusterStateManager(); + ClusterPartition[] cp = csm.getNodePartitions(node); + throw cp != null ? new IllegalStateException() : new IllegalArgumentException(); } return (ObjectNode) om.readTree(dump); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index fa67190..a35d191 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -39,7 +39,6 @@ import org.apache.asterix.lang.aql.parser.TokenMgrError; import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; @@ -428,7 +427,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param, String handleUrl, long[] outExecStartEnd) throws Exception { - IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); + IClusterManagementWork.ClusterState clusterState = + ((ICcApplicationContext) appCtx).getClusterStateManager().getState(); if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { // using a plain IllegalStateException here to get into the right catch clause for a 500 throw new IllegalStateException("Cannot execute request, cluster is " + clusterState); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java index 38f6691..0fe6863 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java @@ -28,8 +28,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -49,14 +50,17 @@ public class ShutdownApiServlet extends AbstractServlet { public static final String NCSERVICE_PID = "ncservice_pid"; public static final String INI = "ini"; public static final String PID = "pid"; + private final ICcApplicationContext appCtx; - public ShutdownApiServlet(ConcurrentMap ctx, String[] paths) { + public ShutdownApiServlet(ICcApplicationContext appCtx, ConcurrentMap ctx, String[] paths) { super(ctx, paths); + this.appCtx = appCtx; } @Override protected void post(IServletRequest request, IServletResponse response) { IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); + IClusterStateManager csm = appCtx.getClusterStateManager(); boolean terminateNCServices = "true".equalsIgnoreCase(request.getParameter("all")); Thread t = new Thread(() -> { try { @@ -79,7 +83,7 @@ public class ShutdownApiServlet extends AbstractServlet { try { jsonObject.put("status", "SHUTTING_DOWN"); jsonObject.put("date", new Date().toString()); - ObjectNode clusterState = ClusterStateManager.INSTANCE.getClusterStateDescription(); + ObjectNode clusterState = csm.getClusterStateDescription(); ArrayNode ncs = (ArrayNode) clusterState.get("ncs"); for (int i = 0; i < ncs.size(); i++) { ObjectNode nc = (ObjectNode) ncs.get(i); @@ -94,7 +98,7 @@ public class ShutdownApiServlet extends AbstractServlet { final PrintWriter writer = response.writer(); writer.print(JSONUtil.convertNode(jsonObject)); // accept no further queries once this servlet returns - ClusterStateManager.INSTANCE.setState(SHUTTING_DOWN); + csm.setState(SHUTTING_DOWN); writer.close(); } catch (Exception e) { GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, "Exception writing response", e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java deleted file mode 100644 index 372404c..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.cc; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; - -public class ResourceIdManager implements IResourceIdManager { - - private final AtomicLong globalResourceId = new AtomicLong(); - private volatile Set reportedNodes = new HashSet<>(); - private volatile boolean allReported = false; - - @Override - public long createResourceId() { - if (!allReported) { - synchronized (this) { - if (!allReported) { - if (reportedNodes.size() < ClusterStateManager.INSTANCE.getNumberOfNodes()) { - return -1; - } else { - reportedNodes = null; - allReported = true; - } - } - } - } - return globalResourceId.incrementAndGet(); - } - - @Override - public synchronized boolean reported(String nodeId) { - return allReported || reportedNodes.contains(nodeId); - } - - @Override - public synchronized void report(String nodeId, long maxResourceId) { - if (!allReported) { - globalResourceId.set(Math.max(maxResourceId, globalResourceId.get())); - reportedNodes.add(nodeId); - if (reportedNodes.size() == ClusterStateManager.INSTANCE.getNumberOfNodes()) { - reportedNodes = null; - allReported = true; - } - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index 5cee3d9..62dcede 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -30,6 +30,7 @@ import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.api.http.server.ResultUtil; import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -41,7 +42,6 @@ import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; @@ -139,7 +139,9 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) { return "Node is not registerted with the CC"; } - final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); + ICcApplicationContext appCtx = (ICcApplicationContext) ccSrv.getApplicationContext(); + IClusterStateManager csm = appCtx.getClusterStateManager(); + final IClusterManagementWork.ClusterState clusterState = csm.getState(); if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { return "Cannot execute request, cluster is " + clusterState; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 7647881..3591bf0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -131,7 +131,6 @@ public class NCAppRuntimeContext implements INcApplicationContext { private IReplicationManager replicationManager; private IRemoteRecoveryManager remoteRecoveryManager; private IReplicaResourcesManager replicaResourcesManager; - private final ILibraryManager libraryManager; private final NCExtensionManager ncExtensionManager; private final IStorageComponentProvider componentProvider; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 7ed5171..365e4a3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -56,6 +56,7 @@ import org.apache.asterix.app.active.FeedEventsListener; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; @@ -152,7 +153,6 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.TypeSignature; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; @@ -707,7 +707,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected static String configureNodegroupForDataset(ICcApplicationContext appCtx, Map hints, String dataverseName, String datasetName, MetadataProvider metadataProvider) throws Exception { - Set allNodes = ClusterStateManager.INSTANCE.getParticipantNodes(true); + IClusterStateManager csm = appCtx.getClusterStateManager(); + Set allNodes = csm.getParticipantNodes(true); Set selectedNodes = new LinkedHashSet<>(); String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME); if (hintValue == null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 6d817b8..37e0c58 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -54,7 +54,6 @@ import org.apache.asterix.api.http.server.UpdateApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; -import org.apache.asterix.app.cc.ResourceIdManager; import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.app.replication.FaultToleranceStrategyFactory; import org.apache.asterix.common.api.AsterixThreadFactory; @@ -79,7 +78,6 @@ import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.runtime.job.resource.JobCapacityController; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.hyracks.api.application.ICCServiceContext; @@ -130,7 +128,7 @@ public class CCApplication extends BaseCCApplication { int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); hcc = new HyracksConnection(strIP, port); ILibraryManager libraryManager = new ExternalLibraryManager(); - ResourceIdManager resourceIdManager = new ResourceIdManager(); + IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, ccServiceCtx); @@ -138,10 +136,9 @@ public class CCApplication extends BaseCCApplication { componentProvider = new StorageComponentProvider(); GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); statementExecutorCtx = new StatementExecutorContext(); - appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager, - () -> MetadataManager.INSTANCE, globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(), - componentProvider, new MetadataLockManager()); - ClusterStateManager.INSTANCE.setCcAppCtx(appCtx); + appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE, + globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(), componentProvider, + new MetadataLockManager()); ccExtensionManager = new CCExtensionManager(getExtensions()); appCtx.setExtensionManager(ccExtensionManager); final CCConfig ccConfig = controllerService.getCCConfig(); @@ -191,7 +188,7 @@ public class CCApplication extends BaseCCApplication { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Stopping Asterix cluster controller"); } - ClusterStateManager.INSTANCE.setState(SHUTTING_DOWN); + appCtx.getClusterStateManager().setState(SHUTTING_DOWN); if (appCtx != null) { ((ActiveNotificationHandler) appCtx.getActiveNotificationHandler()).stop(); } @@ -305,17 +302,17 @@ public class CCApplication extends BaseCCApplication { case Servlets.REBALANCE: return new RebalanceApiServlet(ctx, paths, appCtx); case Servlets.SHUTDOWN: - return new ShutdownApiServlet(ctx, paths); + return new ShutdownApiServlet(appCtx, ctx, paths); case Servlets.VERSION: return new VersionApiServlet(ctx, paths); case Servlets.CLUSTER_STATE: - return new ClusterApiServlet(ctx, paths); + return new ClusterApiServlet(appCtx, ctx, paths); case Servlets.CLUSTER_STATE_NODE_DETAIL: - return new NodeControllerDetailsApiServlet(ctx, paths); + return new NodeControllerDetailsApiServlet(appCtx, ctx, paths); case Servlets.CLUSTER_STATE_CC_DETAIL: - return new ClusterControllerDetailsApiServlet(ctx, paths); + return new ClusterControllerDetailsApiServlet(appCtx, ctx, paths); case Servlets.DIAGNOSTICS: - return new DiagnosticsApiServlet(ctx, paths); + return new DiagnosticsApiServlet(appCtx, ctx, paths); case Servlets.ACTIVE_STATS: return new ActiveStatsApiServlet(ctx, paths, appCtx); default: @@ -334,7 +331,7 @@ public class CCApplication extends BaseCCApplication { @Override public void startupCompleted() throws Exception { ccServiceCtx.getControllerService().getExecutor().submit(() -> { - ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE); + appCtx.getClusterStateManager().waitForState(ClusterState.ACTIVE); ClusterManagerProvider.getClusterManager().notifyStartupCompleted(); return null; }); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 66f76c5..0583508 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.api.IClusterEventsSubscriber; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.IClusterManagementWorkResponse; import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.event.schema.cluster.Node; @@ -42,7 +43,6 @@ import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.cluster.RemoveNodeWork; import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.application.IClusterLifecycleListener; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.exceptions.HyracksException; @@ -70,10 +70,11 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NC: " + nodeId + " joined"); } - ClusterStateManager.INSTANCE.addNCConfiguration(nodeId, ncConfiguration); + IClusterStateManager csm = appCtx.getClusterStateManager(); + csm.addNCConfiguration(nodeId, ncConfiguration); //if metadata node rejoining, we need to rebind the proxy connection when it is active again. - if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) { + if (!csm.isMetadataNodeActive()) { MetadataManager.INSTANCE.rebindMetadataNode(); } @@ -99,10 +100,11 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NC: " + deadNode + " left"); } - ClusterStateManager.INSTANCE.removeNCConfiguration(deadNode); + IClusterStateManager csm = appCtx.getClusterStateManager(); + csm.removeNCConfiguration(deadNode); //if metadata node failed, we need to rebind the proxy connection when it is active again - if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) { + if (!csm.isMetadataNodeActive()) { MetadataManager.INSTANCE.rebindMetadataNode(); } } @@ -171,8 +173,9 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { List addedNodes = new ArrayList<>(); String asterixInstanceName = ClusterProperties.INSTANCE.getCluster().getInstanceName(); + IClusterStateManager csm = appCtx.getClusterStateManager(); for (int i = 0; i < nodesToAdd; i++) { - Node node = ClusterStateManager.INSTANCE.getAvailableSubstitutionNode(); + Node node = csm.getAvailableSubstitutionNode(); if (node != null) { try { ClusterManagerProvider.getClusterManager().addNode(appCtx, node); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java index 46968b4..2977a58 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java @@ -25,13 +25,13 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.metadata.cluster.AddNodeWork; import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.cluster.RemoveNodeWork; -import org.apache.asterix.runtime.utils.ClusterStateManager; public class ClusterWorkExecutor implements Runnable { @@ -69,9 +69,10 @@ public class ClusterWorkExecutor implements Runnable { } } + IClusterStateManager csm = appCtx.getClusterStateManager(); Set addedNodes = new HashSet<>(); for (int i = 0; i < nodesToAdd; i++) { - Node node = ClusterStateManager.INSTANCE.getAvailableSubstitutionNode(); + Node node = csm.getAvailableSubstitutionNode(); if (node != null) { try { ClusterManagerProvider.getClusterManager().addNode(appCtx, node); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 9fc9940..cc95770 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -33,6 +33,7 @@ import org.apache.asterix.active.EntityId; import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActiveManagerMessage.Kind; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; @@ -66,7 +67,6 @@ import org.apache.asterix.metadata.feeds.FeedMetadataUtil; import org.apache.asterix.metadata.feeds.LocationConstraint; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.asterix.translator.CompiledStatements; import org.apache.asterix.translator.IStatementExecutor; @@ -134,8 +134,10 @@ public class FeedOperations { public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider metadataProvider, Feed feed) throws AsterixException { - JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations(); + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); + JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx); + IClusterStateManager csm = appCtx.getClusterStateManager(); + AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations(); Set nodes = new TreeSet<>(); for (String node : allCluster.getLocations()) { nodes.add(node); @@ -143,7 +145,7 @@ public class FeedOperations { AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()])); FileSplit[] feedLogFileSplits = - FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); + FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations); org.apache.hyracks.algebricks.common.utils.Pair spC = StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, spC.first, true); @@ -273,9 +275,8 @@ public class FeedOperations { } // make connections between operators - for (Entry, Pair>> entry - : subJob.getConnectorOperatorMap().entrySet()) { + for (Entry, + Pair>> entry : subJob.getConnectorOperatorMap().entrySet()) { ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey()); IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId); Pair leftOp = entry.getValue().getLeft(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index c1421c5..53a4f23 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -405,7 +405,10 @@ public class TestNodeController { IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, false, true, MetadataUtil.PENDING_NO_OP); List nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId()); - FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(dataset, index.getIndexName(), nodes); + FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits( + ((ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext()) + .getClusterStateManager(), + dataset, index.getIndexName(), nodes); fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java index 0aea84d..e1840d3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java @@ -59,5 +59,4 @@ public interface IApplicationContext { public ILibraryManager getLibraryManager(); IServiceContext getServiceContext(); - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 30675cd..b368c3b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -19,12 +19,19 @@ package org.apache.asterix.common.cluster; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.event.schema.cluster.Node; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; + +import com.fasterxml.jackson.databind.node.ObjectNode; public interface IClusterStateManager { @@ -49,6 +56,7 @@ public interface IClusterStateManager { /** * Updates all partitions of {@code nodeId} based on the {@code active} flag. + * * @param nodeId * @param active * @throws HyracksDataException @@ -93,6 +101,7 @@ public interface IClusterStateManager { /** * Blocks until the cluster state becomes {@code state}, or timeout is exhausted. + * * @return true if the desired state was reached before timeout occurred */ boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) @@ -116,4 +125,106 @@ public interface IClusterStateManager { * @throws HyracksDataException */ void deregisterNodePartitions(String nodeId) throws HyracksDataException; + + /** + * @return true if cluster is active, false otherwise + */ + boolean isClusterActive(); + + /** + * @return the set of participant nodes + */ + Set getParticipantNodes(); + + /** + * Returns the IO devices configured for a Node Controller + * + * @param nodeId + * unique identifier of the Node Controller + * @return a list of IO devices. + */ + String[] getIODevices(String nodeId); + + /** + * @return the constraint representing all the partitions of the cluster + */ + AlgebricksAbsolutePartitionConstraint getClusterLocations(); + + /** + * @param excludePendingRemoval + * true, if the desired set shouldn't have pending removal nodes + * @return the set of participant nodes + */ + Set getParticipantNodes(boolean excludePendingRemoval); + + /** + * @param node + * the node id + * @return the number of partitions on that node + */ + int getNodePartitionsCount(String node); + + /** + * @return a json object representing the cluster state summary + */ + ObjectNode getClusterStateSummary(); + + /** + * @return a json object representing the cluster state description + */ + ObjectNode getClusterStateDescription(); + + /** + * Set the cc application context + * + * @param appCtx + */ + void setCcAppCtx(ICcApplicationContext appCtx); + + /** + * @return the number of cluster nodes + */ + int getNumberOfNodes(); + + /** + * Add node configuration + * + * @param nodeId + * @param ncConfiguration + * @throws HyracksException + */ + void addNCConfiguration(String nodeId, Map ncConfiguration) throws HyracksException; + + /** + * @return true if metadata node is active, false otherwise + */ + boolean isMetadataNodeActive(); + + /** + * Remove configuration of a dead node + * + * @param deadNode + * @throws HyracksException + */ + void removeNCConfiguration(String deadNode) throws HyracksException; + + /** + * @return a substitution node or null + */ + Node getAvailableSubstitutionNode(); + + /** + * Add node to the list of nodes pending removal + * + * @param nodeId + */ + void removePending(String nodeId); + + /** + * Deregister intention to remove node id + * + * @param nodeId + * @return + */ + boolean cancelRemovePending(String nodeId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 3eff214..20f685a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; @@ -99,6 +100,11 @@ public interface ICcApplicationContext extends IApplicationContext { IMetadataLockManager getMetadataLockManager(); /** + * @return the metadata bootstrap + */ + IMetadataBootstrap getMetadataBootstrap(); + + /** * @return the cluster state manager */ IClusterStateManager getClusterStateManager(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 2eb81d4..0a47788 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -25,6 +25,7 @@ import java.util.logging.Logger; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataFlowController; @@ -92,8 +93,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException { INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); - INcApplicationContext appCtx = - (INcApplicationContext) serviceCtx.getApplicationContext(); + INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext(); try { restoreExternalObjects(serviceCtx, appCtx.getLibraryManager()); } catch (Exception e) { @@ -152,15 +152,16 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF dataParserFactory.setMetaType(metaType); dataParserFactory.configure(configuration); ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory); - configureFeedLogManager(); + configureFeedLogManager(appCtx); nullifyExternalObjects(); } - private void configureFeedLogManager() throws HyracksDataException, AlgebricksException { + private void configureFeedLogManager(IApplicationContext appCtx) throws HyracksDataException, AlgebricksException { this.isFeed = ExternalDataUtils.isFeed(configuration); if (isFeed) { - feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration), - ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint()); + feedLogFileSplits = FeedUtils.splitsForAdapter((ICcApplicationContext) appCtx, + ExternalDataUtils.getDataverse(configuration), ExternalDataUtils.getFeedName(configuration), + dataSourceFactory.getPartitionConstraint()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java index edda448..4c3b7e6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java @@ -24,9 +24,9 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; -import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -84,9 +84,10 @@ public interface IExternalDataSourceFactory extends Serializable { * @return * @throws AlgebricksException */ - public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx, + public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(ICcApplicationContext appCtx, AlgebricksAbsolutePartitionConstraint constraints, int count) throws AlgebricksException { if (constraints == null) { + IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); ArrayList locs = new ArrayList<>(); Set stores = appCtx.getMetadataProperties().getStores().keySet(); if (stores.isEmpty()) { @@ -97,7 +98,7 @@ public interface IExternalDataSourceFactory extends Serializable { Iterator storeIt = stores.iterator(); while (storeIt.hasNext()) { String node = storeIt.next(); - int numIODevices = ClusterStateManager.INSTANCE.getIODevices(node).length; + int numIODevices = clusterStateManager.getIODevices(node).length; for (int k = 0; k < numIODevices; k++) { locs.add(node); i++; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java index af4be91..6291ba1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; @@ -55,7 +55,7 @@ public class RSSRecordReaderFactory implements IRecordReaderFactory { public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { int count = urls.size(); clusterLocations = IExternalDataSourceFactory.getPartitionConstraints( - (IApplicationContext) serviceContext.getApplicationContext(), clusterLocations, count); + (ICcApplicationContext) serviceContext.getApplicationContext(), clusterLocations, count); return clusterLocations; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index a0b53fd..3fb1d5f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.IExternalDataSourceFactory; @@ -55,20 +55,16 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory private transient AlgebricksAbsolutePartitionConstraint clusterLocations; private transient IServiceContext serviceCtx; - private static final List recordReaderNames = Collections.unmodifiableList(Arrays.asList( - ExternalDataConstants.READER_TWITTER_PULL, - ExternalDataConstants.READER_TWITTER_PUSH, - ExternalDataConstants.READER_PUSH_TWITTER, - ExternalDataConstants.READER_PULL_TWITTER, - ExternalDataConstants.READER_USER_STREAM_TWITTER)); - + private static final List recordReaderNames = + Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.READER_TWITTER_PULL, + ExternalDataConstants.READER_TWITTER_PUSH, ExternalDataConstants.READER_PUSH_TWITTER, + ExternalDataConstants.READER_PULL_TWITTER, ExternalDataConstants.READER_USER_STREAM_TWITTER)); @Override public DataSourceType getDataSourceType() { return DataSourceType.RECORDS; } - @Override public List getRecordReaderNames() { return recordReaderNames; @@ -77,8 +73,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints( - (IApplicationContext) serviceCtx.getApplicationContext(), - clusterLocations, INTAKE_CARDINALITY); + (ICcApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, INTAKE_CARDINALITY); return clusterLocations; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java index caeaa07..bfc2cb7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java @@ -25,7 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IExternalDataSourceFactory; @@ -50,7 +50,7 @@ public class SocketClientInputStreamFactory implements IInputStreamFactory { @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints( - (IApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, sockets.size()); + (ICcApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, sockets.size()); return clusterLocations; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java index 12be449..c7b8633 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java @@ -23,10 +23,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -54,6 +54,7 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory { private static final String KEY_INGESTION_LOCATIONS = "ingestion-location"; private Map configuration; + private transient IServiceContext serviceCtx; @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { @@ -67,10 +68,10 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory { if (ingestionCardinalityParam != null) { count = Integer.parseInt(ingestionCardinalityParam); } - + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); List chosenLocations = new ArrayList<>(); String[] availableLocations = locations != null ? locations - : ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {}); + : appCtx.getClusterStateManager().getParticipantNodes().toArray(new String[] {}); for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) { chosenLocations.add(availableLocations[k]); } @@ -84,6 +85,7 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory { @Override public void configure(IServiceContext serviceCtx, Map configuration) { + this.serviceCtx = serviceCtx; this.configuration = configuration; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index ec7de91..dad0d51 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -26,9 +26,9 @@ import java.util.Map; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType; @@ -63,9 +63,9 @@ public class FeedUtils { } public enum Mode { - PROCESS, // There is memory - SPILL, // Memory budget has been consumed. Now we're writing to disk - DISCARD // Memory and Disk space budgets have been consumed. Now we're discarding + PROCESS, // There is memory + SPILL, // Memory budget has been consumed. Now we're writing to disk + DISCARD // Memory and Disk space budgets have been consumed. Now we're discarding } private FeedUtils() { @@ -87,7 +87,7 @@ public class FeedUtils { return StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath()); } - public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, + public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx, String dataverseName, String feedName, AlgebricksPartitionConstraint partitionConstraints) throws AsterixException { if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) { throw new AsterixException("Can't create file splits for adapter with count partitioning constraints"); @@ -96,7 +96,7 @@ public class FeedUtils { List splits = new ArrayList<>(); for (String nd : locations) { splits.add(splitsForAdapter(dataverseName, feedName, nd, - ClusterStateManager.INSTANCE.getNodePartitions(nd)[0])); + appCtx.getClusterStateManager().getNodePartitions(nd)[0])); } return splits.toArray(new FileSplit[] {}); } @@ -113,8 +113,8 @@ public class FeedUtils { public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit) throws HyracksDataException { - return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), - 0, ctx.getIoManager()).getFile()); + return new FeedLogManager( + FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile()); } public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index b4353e7..bd50352 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.indexing.ExternalFile; @@ -33,7 +35,6 @@ import org.apache.asterix.external.indexing.IndexingScheduler; import org.apache.asterix.external.indexing.RecordId.RecordIdType; import org.apache.asterix.external.input.stream.HDFSInputStream; import org.apache.asterix.hivecompat.io.RCFileInputFormat; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -207,10 +208,12 @@ public class HDFSUtils { public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx, AlgebricksAbsolutePartitionConstraint clusterLocations) { if (clusterLocations == null) { + IClusterStateManager clusterStateManager = ((ICcApplicationContext) appCtx).getClusterStateManager(); ArrayList locs = new ArrayList<>(); Map stores = appCtx.getMetadataProperties().getStores(); for (String node : stores.keySet()) { - int numIODevices = ClusterStateManager.INSTANCE.getIODevices(node).length; + + int numIODevices = clusterStateManager.getIODevices(node).length; for (int k = 0; k < numIODevices; k++) { locs.add(node); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java index c9e37ee..715ad02 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java @@ -23,7 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; @@ -40,11 +40,10 @@ public class RecordWithPKTestReaderFactory implements IRecordReaderFactory recordReaderNames = Collections.unmodifiableList(Arrays.asList()); - @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { - clusterLocations = IExternalDataSourceFactory - .getPartitionConstraints((IApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, 1); + clusterLocations = IExternalDataSourceFactory.getPartitionConstraints( + (ICcApplicationContext) serviceCtx.getApplicationContext(), clusterLocations, 1); return clusterLocations; } @@ -64,7 +63,8 @@ public class RecordWithPKTestReaderFactory implements IRecordReaderFactory getRecordReaderNames() { + @Override + public List getRecordReaderNames() { return recordReaderNames; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java index c45941d..c09b9eb 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java @@ -23,9 +23,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -43,17 +43,23 @@ public class KVTestReaderFactory implements IRecordReaderFactory { private int upsertCycle = 0; private int numOfReaders; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + private transient IServiceContext serviceCtx; private static final List recordReaderNames = Collections.unmodifiableList(Arrays.asList()); @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { - clusterLocations = ClusterStateManager.INSTANCE.getClusterLocations(); - numOfReaders = clusterLocations.getLocations().length; + if (clusterLocations == null) { + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + clusterLocations = appCtx.getClusterStateManager().getClusterLocations(); + numOfReaders = clusterLocations.getLocations().length; + } return clusterLocations; + } @Override public void configure(IServiceContext serviceCtx, final Map configuration) { + this.serviceCtx = serviceCtx; if (configuration.containsKey("num-of-records")) { numOfRecords = Integer.parseInt(configuration.get("num-of-records")); } @@ -83,7 +89,8 @@ public class KVTestReaderFactory implements IRecordReaderFactory { return DCPRequest.class; } - @Override public List getRecordReaderNames() { + @Override + public List getRecordReaderNames() { return recordReaderNames; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index 5262e1f..616ed6e 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IExternalDataSourceFactory; @@ -65,7 +66,7 @@ public class TestTypedAdapterFactory implements IAdapterFactory { @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints( - (IApplicationContext) serviceContext.getApplicationContext(), clusterLocations, 1); + (ICcApplicationContext) serviceContext.getApplicationContext(), clusterLocations, 1); return clusterLocations; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java index 709f655..f5cfbb3 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java @@ -22,7 +22,6 @@ import java.util.HashSet; import java.util.Set; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.utils.Pair; /** @@ -113,7 +112,7 @@ public class DatasetHints { if (intValue < 0) { return new Pair<>(false, "Value must be >= 0"); } - int numNodesInCluster = ClusterStateManager.INSTANCE.getParticipantNodes(true).size(); + int numNodesInCluster = appCtx.getClusterStateManager().getParticipantNodes(true).size(); if (numNodesInCluster < intValue) { return new Pair<>(false, "Value must be less than or equal to the available number of nodes in cluster ("