Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1C63B200C4E for ; Fri, 21 Apr 2017 19:01:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1B59E160B86; Fri, 21 Apr 2017 17:01:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0759160BB7 for ; Fri, 21 Apr 2017 19:01:33 +0200 (CEST) Received: (qmail 73460 invoked by uid 500); 21 Apr 2017 17:01:32 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 73196 invoked by uid 99); 21 Apr 2017 17:01:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 17:01:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BDD17F4A03; Fri, 21 Apr 2017 17:01:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjaco002@apache.org To: commits@asterixdb.apache.org Date: Fri, 21 Apr 2017 17:01:37 -0000 Message-Id: <24d015a1311e44539902dc7adbb5eb7d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/8] asterixdb git commit: Remove static cc application context instance archived-at: Fri, 21 Apr 2017 17:01:36 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 57492be..401f55e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -25,6 +25,7 @@ import java.util.logging.Logger; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.SessionConfig; import org.apache.hyracks.api.dataset.DatasetJobRecord; @@ -40,8 +41,8 @@ import io.netty.handler.codec.http.HttpResponseStatus; public class QueryResultApiServlet extends AbstractQueryApiServlet { private static final Logger LOGGER = Logger.getLogger(QueryResultApiServlet.class.getName()); - public QueryResultApiServlet(ConcurrentMap ctx, String[] paths) { - super(ctx, paths); + public QueryResultApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx) { + super(appCtx, ctx, paths); } @Override @@ -94,7 +95,7 @@ public class QueryResultApiServlet extends AbstractQueryApiServlet { // originally determined there. Need to save this value on // some object that we can obtain here. SessionConfig sessionConfig = RestApiServlet.initResponse(request, response); - ResultUtil.printResults(resultReader, sessionConfig, new Stats(), null); + ResultUtil.printResults(appCtx, resultReader, sessionConfig, new Stats(), null); } catch (HyracksDataException e) { final int errorCode = e.getErrorCode(); if (ErrorCode.NO_RESULTSET == errorCode) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index faf9968..20bffc4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.translator.IStatementExecutor.ResultDelivery; - import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; @@ -34,6 +32,7 @@ import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.aql.parser.TokenMgrError; @@ -42,6 +41,7 @@ import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; @@ -71,10 +71,10 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { private final IStorageComponentProvider componentProvider; private final IStatementExecutorContext queryCtx = new StatementExecutorContext(); - public QueryServiceServlet(ConcurrentMap ctx, String[] paths, + public QueryServiceServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx, ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider) { - super(ctx, paths); + super(appCtx, ctx, paths); this.compilationProvider = compilationProvider; this.statementExecutorFactory = statementExecutorFactory; this.componentProvider = componentProvider; @@ -179,7 +179,6 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { String clientContextID; String mode; - @Override public String toString() { try { @@ -236,8 +235,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { return SessionConfig.OutputFormat.CLEAN_JSON; } - private static SessionConfig createSessionConfig(RequestParameters param, String handleUrl, PrintWriter - resultWriter) { + private static SessionConfig createSessionConfig(RequestParameters param, String handleUrl, + PrintWriter resultWriter) { SessionConfig.ResultDecorator resultPrefix = new SessionConfig.ResultDecorator() { int resultNo = -1; @@ -386,9 +385,12 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { * delivery mode. Usually there will be a "status" endpoint for ASYNC requests that exposes the status of the * execution and a "result" endpoint for DEFERRED requests that will deliver the result for a successful execution. * - * @param host hostname used for this request - * @param path servlet path for this request - * @param delivery ResultDelivery mode for this request + * @param host + * hostname used for this request + * @param path + * servlet path for this request + * @param delivery + * ResultDelivery mode for this request * @return a handle (URL) that allows a client to access further information for this request */ protected String getHandleUrl(String host, String path, ResultDelivery delivery) { @@ -430,7 +432,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { List statements = parser.parse(); MetadataManager.INSTANCE.init(); IStatementExecutor translator = - statementExecutorFactory.create(statements, sessionConfig, compilationProvider, componentProvider); + statementExecutorFactory.create(appCtx, statements, sessionConfig, compilationProvider, + componentProvider); execStart = System.nanoTime(); translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats, param.clientContextID, queryCtx); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java index 8fbb4c5..d0c574e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java @@ -29,6 +29,7 @@ import java.util.logging.Logger; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.dataset.DatasetJobRecord; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.http.api.IServletRequest; @@ -40,8 +41,8 @@ import io.netty.handler.codec.http.HttpResponseStatus; public class QueryStatusApiServlet extends AbstractQueryApiServlet { private static final Logger LOGGER = Logger.getLogger(QueryStatusApiServlet.class.getName()); - public QueryStatusApiServlet(ConcurrentMap ctx, String[] paths) { - super(ctx, paths); + public QueryStatusApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx) { + super(appCtx, ctx, paths); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java index 94ce017..c798a7c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java @@ -25,7 +25,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.config.ExternalProperties; -import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.StaticResourceServlet; @@ -38,9 +38,11 @@ import io.netty.handler.codec.http.HttpResponseStatus; public class QueryWebInterfaceServlet extends StaticResourceServlet { private static final Logger LOGGER = Logger.getLogger(QueryWebInterfaceServlet.class.getName()); + private ICcApplicationContext appCtx; - public QueryWebInterfaceServlet(ConcurrentMap ctx, String[] paths) { + public QueryWebInterfaceServlet(ICcApplicationContext appCtx, ConcurrentMap ctx, String[] paths) { super(ctx, paths); + this.appCtx = appCtx; } @Override @@ -57,7 +59,7 @@ public class QueryWebInterfaceServlet extends StaticResourceServlet { @Override protected void post(IServletRequest request, IServletResponse response) throws IOException { HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); - ExternalProperties externalProperties = AppContextInfo.INSTANCE.getExternalProperties(); + ExternalProperties externalProperties = appCtx.getExternalProperties(); response.setStatus(HttpResponseStatus.OK); ObjectMapper om = new ObjectMapper(); ObjectNode obj = om.createObjectNode(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java index 74290f3..e339ba9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java @@ -32,6 +32,7 @@ import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.aql.parser.TokenMgrError; @@ -61,15 +62,17 @@ import io.netty.handler.codec.http.HttpResponseStatus; public abstract class RestApiServlet extends AbstractServlet { private static final Logger LOGGER = Logger.getLogger(RestApiServlet.class.getName()); + private final ICcApplicationContext appCtx; private final ILangCompilationProvider compilationProvider; private final IParserFactory parserFactory; private final IStatementExecutorFactory statementExecutorFactory; private final IStorageComponentProvider componentProvider; - public RestApiServlet(ConcurrentMap ctx, String[] paths, + public RestApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx, ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider) { super(ctx, paths); + this.appCtx = appCtx; this.compilationProvider = compilationProvider; this.parserFactory = compilationProvider.getParserFactory(); this.statementExecutorFactory = statementExecutorFactory; @@ -188,7 +191,8 @@ public abstract class RestApiServlet extends AbstractServlet { synchronized (ctx) { hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); if (hds == null) { - hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS); + hds = new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(), + ResultReader.NUM_READERS); ctx.put(HYRACKS_DATASET_ATTR, hds); } } @@ -197,7 +201,7 @@ public abstract class RestApiServlet extends AbstractServlet { List aqlStatements = parser.parse(); validate(aqlStatements); MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(aqlStatements, sessionConfig, + IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionConfig, compilationProvider, componentProvider); translator.compileAndExecute(hcc, hds, resultDelivery, new IStatementExecutor.Stats()); } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java index a257958..fe6fa89 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java @@ -36,6 +36,7 @@ import java.util.stream.Stream; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultPrinter; import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.SessionConfig; @@ -76,14 +77,14 @@ public class ResultUtil { return escaped; } - public static void printResults(ResultReader resultReader, SessionConfig conf, Stats stats, - ARecordType recordType) throws HyracksDataException { - new ResultPrinter(conf, stats, recordType).print(resultReader); + public static void printResults(ICcApplicationContext appCtx, ResultReader resultReader, SessionConfig conf, + Stats stats, ARecordType recordType) throws HyracksDataException { + new ResultPrinter(appCtx, conf, stats, recordType).print(resultReader); } - public static void printResults(String record, SessionConfig conf, Stats stats, ARecordType recordType) - throws HyracksDataException { - new ResultPrinter(conf, stats, recordType).print(record); + public static void printResults(ICcApplicationContext appCtx, String record, SessionConfig conf, Stats stats, + ARecordType recordType) throws HyracksDataException { + new ResultPrinter(appCtx, conf, stats, recordType).print(record); } public static void printResultHandle(SessionConfig conf, ResultHandle handle) throws HyracksDataException { @@ -123,9 +124,8 @@ public class ResultUtil { pw.print("\": [{ \n"); printField(pw, QueryServiceServlet.ErrorField.CODE.str(), "1"); final String msg = rootCause.getMessage(); - printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil - .escape(msg != null ? msg : rootCause.getClass().getSimpleName()), - addStack); + printField(pw, QueryServiceServlet.ErrorField.MSG.str(), + JSONUtil.escape(msg != null ? msg : rootCause.getClass().getSimpleName()), addStack); pw.print(comma ? "\t}],\n" : "\t}]\n"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java index ad2c128..3650189 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java @@ -21,6 +21,7 @@ package org.apache.asterix.api.http.server; import java.util.concurrent.ConcurrentMap; import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.translator.IStatementExecutorFactory; @@ -29,10 +30,10 @@ import org.apache.hyracks.http.api.IServletRequest; public class UpdateApiServlet extends RestApiServlet { private static final byte ALLOWED_CATEGORIES = Statement.Category.QUERY | Statement.Category.UPDATE; - public UpdateApiServlet(ConcurrentMap ctx, String[] paths, + public UpdateApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx, ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider) { - super(ctx, paths, compilationProvider, statementExecutorFactory, componentProvider); + super(ctx, paths, appCtx, compilationProvider, statementExecutorFactory, componentProvider); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java index a4cea39..5acba38 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java @@ -27,15 +27,17 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; import org.apache.hyracks.http.server.utils.HttpUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.netty.handler.codec.http.HttpResponseStatus; + public class VersionApiServlet extends AbstractServlet { private static final Logger LOGGER = Logger.getLogger(VersionApiServlet.class.getName()); @@ -46,7 +48,7 @@ public class VersionApiServlet extends AbstractServlet { @Override protected void get(IServletRequest request, IServletResponse response) { response.setStatus(HttpResponseStatus.OK); - IPropertiesProvider props = (IPropertiesProvider) ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR); + ICcApplicationContext props = (ICcApplicationContext) ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR); Map buildProperties = props.getBuildProperties().getAllProps(); ObjectMapper om = new ObjectMapper(); ObjectNode responseObject = om.createObjectNode(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java index d03e574..ecf2c53 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.asterix.api.common.APIFramework; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.utils.Job; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.IParser; @@ -51,10 +52,12 @@ public class AsterixJavaClient { private final APIFramework apiFramework; private final IStatementExecutorFactory statementExecutorFactory; private final IStorageComponentProvider storageComponentProvider; + private ICcApplicationContext appCtx; - public AsterixJavaClient(IHyracksClientConnection hcc, Reader queryText, PrintWriter writer, - ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, - IStorageComponentProvider storageComponentProvider) { + public AsterixJavaClient(ICcApplicationContext appCtx, IHyracksClientConnection hcc, Reader queryText, + PrintWriter writer, ILangCompilationProvider compilationProvider, + IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider storageComponentProvider) { + this.appCtx = appCtx; this.hcc = hcc; this.queryText = queryText; this.writer = writer; @@ -65,10 +68,10 @@ public class AsterixJavaClient { parserFactory = compilationProvider.getParserFactory(); } - public AsterixJavaClient(IHyracksClientConnection hcc, Reader queryText, + public AsterixJavaClient(ICcApplicationContext appCtx, IHyracksClientConnection hcc, Reader queryText, ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider storageComponentProvider) { - this(hcc, queryText, + this(appCtx, hcc, queryText, // This is a commandline client and so System.out is appropriate new PrintWriter(System.out, true), // NOSONAR compilationProvider, statementExecutorFactory, storageComponentProvider); @@ -102,8 +105,8 @@ public class AsterixJavaClient { conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true); } - IStatementExecutor translator = - statementExecutorFactory.create(statements, conf, compilationProvider, storageComponentProvider); + IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, conf, compilationProvider, + storageComponentProvider); translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE, new IStatementExecutor.Stats()); writer.flush(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java index f43092b..372404c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java @@ -36,7 +36,7 @@ public class ResourceIdManager implements IResourceIdManager { if (!allReported) { synchronized (this) { if (!allReported) { - if (reportedNodes.size() < ClusterStateManager.getNumberOfNodes()) { + if (reportedNodes.size() < ClusterStateManager.INSTANCE.getNumberOfNodes()) { return -1; } else { reportedNodes = null; @@ -58,7 +58,7 @@ public class ResourceIdManager implements IResourceIdManager { if (!allReported) { globalResourceId.set(Math.max(maxResourceId, globalResourceId.get())); reportedNodes.add(nodeId); - if (reportedNodes.size() == ClusterStateManager.getNumberOfNodes()) { + if (reportedNodes.size() == ClusterStateManager.INSTANCE.getNumberOfNodes()) { reportedNodes = null; allReported = true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index a0b4a2a..555f571 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -29,8 +29,8 @@ import java.util.logging.Logger; import org.apache.asterix.active.ActiveManager; import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery; -import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ActiveProperties; @@ -99,7 +99,7 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; import org.apache.hyracks.storage.common.file.IResourceIdFactory; -public class NCAppRuntimeContext implements IAppRuntimeContext { +public class NCAppRuntimeContext implements INcApplicationContext { private static final Logger LOGGER = Logger.getLogger(NCAppRuntimeContext.class.getName()); private ILSMMergePolicyFactory metadataMergePolicyFactory; @@ -165,28 +165,28 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { @Override public void initialize(boolean initialRun) throws IOException, ACIDException { - ioManager = ncServiceContext.getIoManager(); - threadExecutor = new ThreadExecutor(ncServiceContext.getThreadFactory()); + ioManager = getServiceContext().getIoManager(); + threadExecutor = new ThreadExecutor(getServiceContext().getThreadFactory()); fileMapManager = new FileMapManager(ioManager); ICacheMemoryAllocator allocator = new HeapBufferAllocator(); IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000); IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages()); - AsynchronousScheduler.INSTANCE.init(ncServiceContext.getThreadFactory()); + AsynchronousScheduler.INSTANCE.init(getServiceContext().getThreadFactory()); lsmIOScheduler = AsynchronousScheduler.INSTANCE; metadataMergePolicyFactory = new PrefixMergePolicyFactory(); ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = - new PersistentLocalResourceRepositoryFactory(ioManager, ncServiceContext.getNodeId(), + new PersistentLocalResourceRepositoryFactory(ioManager, getServiceContext().getNodeId(), metadataProperties); localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository(); IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AppRuntimeContextProviderForRecovery(this); - txnSubsystem = new TransactionSubsystem(ncServiceContext, ncServiceContext.getNodeId(), + txnSubsystem = new TransactionSubsystem(getServiceContext(), getServiceContext().getNodeId(), asterixAppRuntimeContextProvider, txnProperties); IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager(); @@ -202,11 +202,11 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { isShuttingdown = false; - activeManager = new ActiveManager(threadExecutor, ncServiceContext.getNodeId(), + activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(), activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize()); - if (replicationProperties.isParticipant(ncServiceContext.getNodeId())) { - String nodeId = ncServiceContext.getNodeId(); + if (replicationProperties.isParticipant(getServiceContext().getNodeId())) { + String nodeId = getServiceContext().getNodeId(); replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties); @@ -235,24 +235,24 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { //initialize replication channel replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(), - replicaResourcesManager, replicationManager, ncServiceContext, + replicaResourcesManager, replicationManager, getServiceContext(), asterixAppRuntimeContextProvider); remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties); bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager, - storageProperties.getBufferCacheMaxOpenFiles(), ncServiceContext.getThreadFactory(), + storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(), replicationManager); } else { bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager, - storageProperties.getBufferCacheMaxOpenFiles(), ncServiceContext.getThreadFactory()); + storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory()); } /* * The order of registration is important. The buffer cache must registered before recovery and transaction * managers. Notes: registered components are stopped in reversed order */ - ILifeCycleComponentManager lccm = ncServiceContext.getLifeCycleComponentManager(); + ILifeCycleComponentManager lccm = getServiceContext().getLifeCycleComponentManager(); lccm.register((ILifeCycleComponent) bufferCache); /* * LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager @@ -444,7 +444,7 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(), ncExtensionManager.getMetadataExtensions()); - proxy = (IAsterixStateProxy) ncServiceContext.getDistributedState(); + proxy = (IAsterixStateProxy) getServiceContext().getDistributedState(); if (proxy == null) { throw new IllegalStateException("Metadata node cannot access distributed state"); } @@ -453,9 +453,9 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { // This way we can delay the registration of the metadataNode until // it is completely initialized. MetadataManager.initialize(proxy, MetadataNode.INSTANCE); - MetadataBootstrap.startUniverse(ncServiceContext, newUniverse); + MetadataBootstrap.startUniverse(getServiceContext(), newUniverse); MetadataBootstrap.startDDLRecovery(); - ncExtensionManager.initializeMetadata(ncServiceContext); + ncExtensionManager.initializeMetadata(getServiceContext()); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Metadata node bound"); @@ -466,7 +466,7 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { public void exportMetadataNodeStub() throws RemoteException { IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, getMetadataProperties().getMetadataPort()); - ((IAsterixStateProxy) ncServiceContext.getDistributedState()).setMetadataNode(stub); + ((IAsterixStateProxy) getServiceContext().getDistributedState()).setMetadataNode(stub); } @Override @@ -482,4 +482,9 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { public IStorageComponentProvider getStorageComponentProvider() { return componentProvider; } + + @Override + public INCServiceContext getServiceContext() { + return ncServiceContext; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java index 91f3524..424e66c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.app.nc.task; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -34,7 +34,7 @@ public class BindMetadataNodeTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { if (exportStub) { appContext.exportMetadataNodeStub(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java index e77346a..b7701d2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.app.nc.task; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,7 +30,7 @@ public class CheckpointTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); ICheckpointManager checkpointMgr = appContext.getTransactionSubsystem().getCheckpointManager(); checkpointMgr.doSharpCheckpoint(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java index ad9b28a..9506690 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java @@ -19,7 +19,7 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.app.external.ExternalLibraryUtils; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -35,7 +35,7 @@ public class ExternalLibrarySetupTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java index 777097d..d52d15e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java @@ -21,7 +21,7 @@ package org.apache.asterix.app.nc.task; import java.io.IOException; import java.util.Set; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -38,7 +38,7 @@ public class LocalRecoveryTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { appContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions); } catch (IOException | ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java index 65004b8..6415416 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.app.nc.task; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,7 +30,7 @@ public class MetadataBootstrapTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState(); appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java index 48479c5..f74a986 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -38,7 +38,7 @@ public class RemoteRecoveryTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); appContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java index 9f04b10..8696d23 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.app.nc.task; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -29,7 +29,7 @@ public class StartFailbackTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); appContext.getRemoteRecoveryManager().startFailbackProcess(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java index d1754dd..799581b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler; @@ -40,7 +40,7 @@ public class StartLifecycleComponentsTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext applicationContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext(); NCServiceContext serviceCtx = (NCServiceContext) cs.getContext(); MetadataProperties metadataProperties = applicationContext.getMetadataProperties(); if (LOGGER.isLoggable(Level.INFO)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java index 93e5b50..60d5c29 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.app.nc.task; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,7 +30,7 @@ public class StartReplicationServiceTask implements INCLifecycleTask { @Override public void perform(IControllerService cs) throws HyracksDataException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { //Open replication channel appContext.getReplicationChannel().start(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java index 45f96ac..db26c3a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java @@ -55,6 +55,7 @@ import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; @@ -63,8 +64,8 @@ import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.asterix.util.FaultToleranceUtil; +import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -83,6 +84,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { private IClusterStateManager clusterManager; private ICCMessageBroker messageBroker; private IReplicationStrategy replicationStrategy; + private ICCServiceContext serviceCtx; private Set pendingStartupCompletionNodes = new HashSet<>(); @Override @@ -135,8 +137,8 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { private synchronized void requestPartitionsTakeover(String failedNodeId) { //replica -> list of partitions to takeover Map> partitionRecoveryPlan = new HashMap<>(); - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); - + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + ReplicationProperties replicationProperties = appCtx.getReplicationProperties(); //collect the partitions of the failed NC List lostPartitions = getNodeAssignedPartitions(failedNodeId); if (!lostPartitions.isEmpty()) { @@ -204,7 +206,8 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { planId2FailbackPlanMap.put(plan.getPlanId(), plan); //get all partitions this node requires to resync - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + ReplicationProperties replicationProperties = appCtx.getReplicationProperties(); Set nodeReplicas = replicationProperties.getNodeReplicasIds(failingBackNodeId); clusterManager.getClusterPartitons(); for (String replicaId : nodeReplicas) { @@ -255,7 +258,8 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { * if the returning node is the original metadata node, * then metadata node will change after the failback completes */ - String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + String originalMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName(); if (originalMetadataNode.equals(failbackNode)) { plan.setNodeToReleaseMetadataManager(currentMetadataNode); currentMetadataNode = ""; @@ -399,7 +403,8 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { private synchronized void requestMetadataNodeTakeover() { //need a new node to takeover metadata node - ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + ClusterPartition metadataPartiton = appCtx.getMetadataProperties().getMetadataPartition(); //request the metadataPartition node to register itself as the metadata node TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage(); try { @@ -418,10 +423,11 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { } @Override - public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) { + public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy) { AutoFaultToleranceStrategy ft = new AutoFaultToleranceStrategy(); - ft.messageBroker = messageBroker; + ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); ft.replicationStrategy = replicationStrategy; + ft.serviceCtx = serviceCtx; return ft; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java index 8d382a1..4e8ecd9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java @@ -21,15 +21,15 @@ package org.apache.asterix.app.replication; import java.util.HashMap; import java.util.Map; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.event.schema.cluster.Cluster; +import org.apache.hyracks.api.application.ICCServiceContext; public class FaultToleranceStrategyFactory { - private static final Map> - BUILT_IN_FAULT_TOLERANCE_STRATEGY = new HashMap<>(); + private static final Map> BUILT_IN_FAULT_TOLERANCE_STRATEGY = + new HashMap<>(); static { BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance", NoFaultToleranceStrategy.class); @@ -42,14 +42,14 @@ public class FaultToleranceStrategyFactory { } public static IFaultToleranceStrategy create(Cluster cluster, IReplicationStrategy repStrategy, - ICCMessageBroker messageBroker) { - boolean highAvailabilityEnabled = cluster.getHighAvailability() != null - && cluster.getHighAvailability().getEnabled() != null - && Boolean.valueOf(cluster.getHighAvailability().getEnabled()); + ICCServiceContext serviceCtx) { + boolean highAvailabilityEnabled = + cluster.getHighAvailability() != null && cluster.getHighAvailability().getEnabled() != null + && Boolean.valueOf(cluster.getHighAvailability().getEnabled()); if (!highAvailabilityEnabled || cluster.getHighAvailability().getFaultTolerance() == null || cluster.getHighAvailability().getFaultTolerance().getStrategy() == null) { - return new NoFaultToleranceStrategy().from(repStrategy, messageBroker); + return new NoFaultToleranceStrategy().from(serviceCtx, repStrategy); } String strategyName = cluster.getHighAvailability().getFaultTolerance().getStrategy().toLowerCase(); if (!BUILT_IN_FAULT_TOLERANCE_STRATEGY.containsKey(strategyName)) { @@ -58,7 +58,7 @@ public class FaultToleranceStrategyFactory { } Class clazz = BUILT_IN_FAULT_TOLERANCE_STRATEGY.get(strategyName); try { - return clazz.newInstance().from(repStrategy, messageBroker); + return clazz.newInstance().from(serviceCtx, repStrategy); } catch (InstantiationException | IllegalAccessException e) { throw new IllegalStateException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java index c40e236..1b57403 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java @@ -46,6 +46,7 @@ import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; @@ -54,8 +55,8 @@ import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.Replica; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; -import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.asterix.util.FaultToleranceUtil; +import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -66,6 +67,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate private String metadataNodeId; private IReplicationStrategy replicationStrategy; private ICCMessageBroker messageBroker; + private ICCServiceContext serviceCtx; private final Set hotStandbyMetadataReplica = new HashSet<>(); private final Set failedNodes = new HashSet<>(); private Set pendingStartupCompletionNodes = new HashSet<>(); @@ -91,8 +93,8 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate } // If the failed node is the metadata node, ask its replicas to replay any committed jobs if (nodeId.equals(metadataNodeId)) { - int metadataPartitionId = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition() - .getPartitionId(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + int metadataPartitionId = appCtx.getMetadataProperties().getMetadataPartition().getPartitionId(); Set metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId)); Set activeRemoteReplicas = replicationStrategy.getRemoteReplicas(metadataNodeId).stream() .filter(replica -> !failedNodes.contains(replica.getId())).collect(Collectors.toSet()); @@ -110,10 +112,11 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate } @Override - public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) { + public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy) { MetadataNodeFaultToleranceStrategy ft = new MetadataNodeFaultToleranceStrategy(); ft.replicationStrategy = replicationStrategy; - ft.messageBroker = messageBroker; + ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); + ft.serviceCtx = serviceCtx; return ft; } @@ -247,8 +250,8 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate // Construct recovery plan: Node => Set of partitions to recover from it Map> recoveryPlan = new HashMap<>(); // Recover metadata partition from any metadata hot standby replica - int metadataPartitionId = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition() - .getPartitionId(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + int metadataPartitionId = appCtx.getMetadataProperties().getMetadataPartition().getPartitionId(); Set metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId)); recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), metadataPartition); return new RemoteRecoveryTask(recoveryPlan); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java index b8b3c49..b9ea135 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java @@ -47,6 +47,7 @@ import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { @@ -54,8 +55,8 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName()); IClusterStateManager clusterManager; private String metadataNodeId; - private ICCMessageBroker messageBroker; private Set pendingStartupCompletionNodes = new HashSet<>(); + private ICCMessageBroker messageBroker; @Override public void notifyNodeJoin(String nodeId) throws HyracksDataException { @@ -87,9 +88,9 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { } @Override - public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) { + public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy) { NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy(); - ft.messageBroker = messageBroker; + ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); return ft; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java index 0924838..feca7e8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java @@ -23,14 +23,14 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage { +public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage implements INcAddressedMessage { private static final long serialVersionUID = 1L; private static final Logger LOGGER = Logger.getLogger(CompleteFailbackRequestMessage.class.getName()); @@ -62,9 +62,8 @@ public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); - INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker(); + public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException { + INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker(); HyracksDataException hde = null; try { IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); @@ -73,8 +72,8 @@ public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e); hde = HyracksDataException.create(e); } finally { - CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId, - requestId, partitions); + CompleteFailbackResponseMessage reponse = + new CompleteFailbackResponseMessage(planId, requestId, partitions); try { broker.sendMessageToCC(reponse); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java index fb45892..0c5678f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java @@ -18,14 +18,15 @@ */ package org.apache.asterix.app.replication.message; +import java.util.Set; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; -import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -import java.util.Set; -public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage { +public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage implements ICcAddressedMessage { private static final long serialVersionUID = 1L; private final Set partitions; @@ -49,8 +50,8 @@ public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java index 3af075e..03c7ac6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -18,12 +18,13 @@ */ package org.apache.asterix.app.replication.message; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; -import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -public class NCLifecycleTaskReportMessage implements INCLifecycleMessage { +public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAddressedMessage { private static final long serialVersionUID = 1L; private final String nodeId; @@ -36,8 +37,8 @@ public class NCLifecycleTaskReportMessage implements INCLifecycleMessage { } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this); } public String getNodeId() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java index abfd6b2..cefcf49 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java @@ -23,14 +23,15 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage { +public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage + implements INcAddressedMessage { private static final long serialVersionUID = 1L; private static final Logger LOGGER = Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName()); @@ -71,9 +72,8 @@ public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPla } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext(); - INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker(); + public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException { + INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker(); /** * if the metadata partition will be failed back * we need to flush and close all datasets including metadata datasets http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java index e02cd42..bea1039 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java @@ -18,14 +18,16 @@ */ package org.apache.asterix.app.replication.message; +import java.util.Set; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; -import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -import java.util.Set; -public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage { +public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage + implements ICcAddressedMessage { private static final long serialVersionUID = 1L; private final Set partitions; @@ -40,8 +42,8 @@ public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPl } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java index 5a73543..c8e2479 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java @@ -22,14 +22,14 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.nc.NodeControllerService; -public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage { +public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage, INcAddressedMessage { private static final Logger LOGGER = Logger.getLogger(ReplayPartitionLogsRequestMessage.class.getName()); private static final long serialVersionUID = 1L; @@ -40,9 +40,8 @@ public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage { } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - NodeControllerService ncs = (NodeControllerService) cs; - IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext(); + public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException { + NodeControllerService ncs = (NodeControllerService) appContext.getServiceContext().getControllerService(); // Replay the logs for these partitions and flush any impacted dataset appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java index dc19735..e05fd47 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java @@ -18,14 +18,15 @@ */ package org.apache.asterix.app.replication.message; +import java.util.Set; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; -import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -import java.util.Set; -public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage { +public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage, ICcAddressedMessage { private static final long serialVersionUID = 1L; private final Set partitions; @@ -37,8 +38,8 @@ public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage { } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this); } public Set getPartitions() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java index 6a313f0..cfe999c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java @@ -21,15 +21,16 @@ package org.apache.asterix.app.replication.message; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; -import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.nc.NodeControllerService; -public class StartupTaskRequestMessage implements INCLifecycleMessage { +public class StartupTaskRequestMessage implements INCLifecycleMessage, ICcAddressedMessage { private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName()); private static final long serialVersionUID = 1L; @@ -52,8 +53,8 @@ public class StartupTaskRequestMessage implements INCLifecycleMessage { } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this); } public SystemState getState() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c67f33dd/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java index 92abf5b..aaf3eb8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java @@ -23,12 +23,14 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; -public class StartupTaskResponseMessage implements INCLifecycleMessage { +public class StartupTaskResponseMessage implements INCLifecycleMessage, INcAddressedMessage { private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName()); private static final long serialVersionUID = 1L; @@ -41,8 +43,9 @@ public class StartupTaskResponseMessage implements INCLifecycleMessage { } @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker(); + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + IControllerService cs = appCtx.getServiceContext().getControllerService(); boolean success = true; HyracksDataException exception = null; try {