From commits-return-4913-archive-asf-public=cust-asf.ponee.io@asterixdb.apache.org Sat Jan 27 17:09:56 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 6582018076D for ; Sat, 27 Jan 2018 17:09:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 55E1C160C2F; Sat, 27 Jan 2018 16:09:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E19EB160C52 for ; Sat, 27 Jan 2018 17:09:53 +0100 (CET) Received: (qmail 21930 invoked by uid 500); 27 Jan 2018 16:09:53 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 21911 invoked by uid 99); 27 Jan 2018 16:09:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 Jan 2018 16:09:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D5E8DFAB0; Sat, 27 Jan 2018 16:09:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mblow@apache.org To: commits@asterixdb.apache.org Date: Sat, 27 Jan 2018 16:09:52 -0000 Message-Id: <995f5ecc4fa541a39b7b7bc0ccb2ff81@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] asterixdb git commit: [ASTERIXDB-2110] Introduce Cluster Controller Id [ASTERIXDB-2110] Introduce Cluster Controller Id Change-Id: Iec1b01444bfbd923e38f5c162c5244e17c4d5f03 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2323 Integration-Tests: Jenkins Reviewed-by: Murtadha Hubail Tested-by: Jenkins Contrib: Jenkins Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1a3a8212 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1a3a8212 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1a3a8212 Branch: refs/heads/master Commit: 1a3a82123d197c074000ab428f3aabc4c3cbfc19 Parents: 65b8070 Author: Michael Blow Authored: Fri Jan 26 23:24:55 2018 -0500 Committer: Michael Blow Committed: Sat Jan 27 08:09:19 2018 -0800 ---------------------------------------------------------------------- .../apache/asterix/active/ActiveManager.java | 6 +- .../active/message/ActiveManagerMessage.java | 3 +- .../http/server/NCQueryCancellationServlet.java | 3 +- .../api/http/server/NCQueryServiceServlet.java | 5 +- .../app/nc/task/BindMetadataNodeTask.java | 3 +- .../asterix/app/nc/task/CheckpointTask.java | 3 +- .../app/nc/task/ExternalLibrarySetupTask.java | 3 +- .../asterix/app/nc/task/LocalRecoveryTask.java | 3 +- .../app/nc/task/MetadataBootstrapTask.java | 3 +- .../app/nc/task/ReportLocalCountersTask.java | 5 +- .../nc/task/StartLifecycleComponentsTask.java | 3 +- .../nc/task/StartReplicationServiceTask.java | 3 +- .../message/MetadataNodeRequestMessage.java | 6 +- .../RegistrationTasksRequestMessage.java | 5 +- .../RegistrationTasksResponseMessage.java | 8 +- .../asterix/app/translator/QueryTranslator.java | 3 +- .../hyracks/bootstrap/CCApplication.java | 9 - .../bootstrap/ClusterLifecycleListener.java | 8 +- .../hyracks/bootstrap/NCApplication.java | 15 +- .../asterix/messaging/CCMessageBroker.java | 8 +- .../asterix/messaging/NCMessageBroker.java | 12 +- .../test/active/ActiveEventsListenerTest.java | 3 +- .../asterix/common/api/INCLifecycleTask.java | 4 +- .../common/messaging/CcIdentifiedMessage.java | 38 ++++ .../common/messaging/api/ICCMessageBroker.java | 2 +- .../messaging/api/ICcAddressedMessage.java | 1 - .../messaging/api/ICcIdentifiedMessage.java | 27 +++ .../common/messaging/api/INCMessageBroker.java | 11 +- .../apache/asterix/common/utils/InvokeUtil.java | 190 ---------------- .../asterix/metadata/lock/DatasetLock.java | 2 +- .../message/ReportLocalCountersMessage.java | 6 +- .../ReportLocalCountersRequestMessage.java | 6 +- .../transaction/GlobalResourceIdFactory.java | 2 +- asterixdb/asterix-transactions/pom.xml | 4 + .../management/service/logging/LogManager.java | 2 +- .../logging/LogManagerWithReplication.java | 2 +- .../hyracks/api/application/INCApplication.java | 3 +- .../hyracks/api/config/IApplicationConfig.java | 4 + .../org/apache/hyracks/api/control/CcId.java | 62 ++++++ .../hyracks/api/exceptions/ErrorCode.java | 2 +- .../java/org/apache/hyracks/api/job/JobId.java | 34 ++- .../apache/hyracks/api/job/JobIdFactory.java | 30 ++- .../src/main/resources/errormsg/en.properties | 2 +- .../hyracks/api/job/JobIdFactoryTest.java | 118 ++++++++++ .../control/cc/ClusterControllerService.java | 59 +++-- .../hyracks/control/cc/cluster/NodeManager.java | 2 +- .../control/cc/work/RegisterNodeWork.java | 3 +- .../cc/work/WaitForJobCompletionWork.java | 8 +- .../control/common/base/IClusterController.java | 39 ++-- .../control/common/base/INodeController.java | 28 +-- .../control/common/config/OptionTypes.java | 21 ++ .../control/common/controllers/CCConfig.java | 11 +- .../control/common/controllers/NCConfig.java | 9 + .../control/common/ipc/CCNCFunctions.java | 63 ++++-- .../ipc/ClusterControllerRemoteProxy.java | 15 +- .../common/ipc/NodeControllerRemoteProxy.java | 19 +- .../hyracks/control/nc/BaseNCApplication.java | 3 +- .../org/apache/hyracks/control/nc/Joblet.java | 14 +- .../hyracks/control/nc/NodeControllerIPCI.java | 17 +- .../control/nc/NodeControllerService.java | 215 +++++++++++++------ .../org/apache/hyracks/control/nc/Task.java | 5 +- .../nc/dataset/DatasetPartitionManager.java | 10 +- .../partitions/MaterializedPartitionWriter.java | 2 +- .../MaterializingPipelinedPartition.java | 3 +- .../control/nc/partitions/PartitionManager.java | 28 ++- .../nc/partitions/PipelinedPartition.java | 3 +- .../hyracks/control/nc/task/ThreadDumpTask.java | 10 +- .../control/nc/work/AbortAllJobsWork.java | 20 +- .../control/nc/work/BuildJobProfilesWork.java | 12 +- .../control/nc/work/DeployBinaryWork.java | 7 +- .../control/nc/work/DeployJobSpecWork.java | 8 +- .../control/nc/work/NotifyTaskCompleteWork.java | 4 +- .../control/nc/work/NotifyTaskFailureWork.java | 2 +- .../hyracks/control/nc/work/StateDumpWork.java | 8 +- .../control/nc/work/UnDeployBinaryWork.java | 7 +- .../control/nc/work/UndeployJobSpecWork.java | 7 +- .../btree/helper/TestNCApplication.java | 3 +- .../tests/integration/JobFailureTest.java | 6 - .../hyracks/test/support/TestJobletContext.java | 1 - .../org/apache/hyracks/util/InvokeUtil.java | 190 ++++++++++++++++ .../org/apache/hyracks/util/trace/Tracer.java | 7 +- 81 files changed, 1020 insertions(+), 521 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index bfa648a..aa9ac98 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.asterix.active.message.ActiveManagerMessage; -import org.apache.asterix.active.message.ActiveStatsResponse; import org.apache.asterix.active.message.ActiveStatsRequestMessage; +import org.apache.asterix.active.message.ActiveStatsResponse; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.memory.ConcurrentFramePool; @@ -116,7 +116,7 @@ public class ActiveManager { LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId); // Send a failure message ((NodeControllerService) serviceCtx.getControllerService()) - .sendApplicationMessageToCC( + .sendApplicationMessageToCC(message.getCcId(), JavaSerializationUtils .serialize(new ActiveStatsResponse(reqId, null, new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, @@ -126,7 +126,7 @@ public class ActiveManager { String stats = runtime.getStats(); ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null); ((NodeControllerService) serviceCtx.getControllerService()) - .sendApplicationMessageToCC(JavaSerializationUtils.serialize(response), null); + .sendApplicationMessageToCC(message.getCcId(), JavaSerializationUtils.serialize(response), null); } catch (Exception e) { throw HyracksDataException.create(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index bef418b..b8c44a6 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -22,10 +22,11 @@ import java.io.Serializable; import org.apache.asterix.active.ActiveManager; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; -public class ActiveManagerMessage implements INcAddressedMessage { +public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddressedMessage { public enum Kind { STOP_ACTIVITY, REQUEST_STATS http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java index 87beae1..c3e02af 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java @@ -62,7 +62,8 @@ public class NCQueryCancellationServlet extends QueryCancellationServlet { try { CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(serviceCtx.getNodeId(), cancelQueryFuture.getFutureId(), clientContextId); - messageBroker.sendMessageToCC(cancelQueryMessage); + // TODO(mblow): multicc -- need to send cancellation to the correct cc + messageBroker.sendMessageToPrimaryCC(cancelQueryMessage); cancelQueryFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); response.setStatus(HttpResponseStatus.OK); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 76f489c..5cbac64 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -88,7 +88,7 @@ public class NCQueryServiceServlet extends QueryServiceServlet { responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), resultProperties.getNcToCcResultProperties(), param.clientContextID, handleUrl, optionalParameters); execution.start(); - ncMb.sendMessageToCC(requestMsg); + ncMb.sendMessageToPrimaryCC(requestMsg); try { responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -137,7 +137,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet { try { CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID); - messageBroker.sendMessageToCC(cancelQueryMessage); + // TODO(mblow): multicc -- need to send cancellation to the correct cc + messageBroker.sendMessageToPrimaryCC(cancelQueryMessage); if (wait) { cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java index 49a84e1..e41bc60 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java @@ -20,6 +20,7 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -33,7 +34,7 @@ public class BindMetadataNodeTask implements INCLifecycleTask { } @Override - public void perform(IControllerService cs) throws HyracksDataException { + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { if (exportStub) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java index 02c377a..6f1775e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java @@ -21,6 +21,7 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -29,7 +30,7 @@ public class CheckpointTask implements INCLifecycleTask { private static final long serialVersionUID = 1L; @Override - public void perform(IControllerService cs) throws HyracksDataException { + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); ICheckpointManager checkpointMgr = appContext.getTransactionSubsystem().getCheckpointManager(); checkpointMgr.doSharpCheckpoint(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java index 4e330c6..8cfeb12 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java @@ -21,6 +21,7 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -34,7 +35,7 @@ public class ExternalLibrarySetupTask implements INCLifecycleTask { } @Override - public void perform(IControllerService cs) throws HyracksDataException { + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java index eb19ad6..d0a8dcc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -38,7 +39,7 @@ public class LocalRecoveryTask implements INCLifecycleTask { } @Override - public void perform(IControllerService cs) throws HyracksDataException { + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { appContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java index 784f3b0..001af23 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java @@ -21,6 +21,7 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -29,7 +30,7 @@ public class MetadataBootstrapTask implements INCLifecycleTask { private static final long serialVersionUID = 1L; @Override - public void perform(IControllerService cs) throws HyracksDataException { + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java index 86f7d1c..53b13e8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java @@ -20,6 +20,7 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.runtime.message.ReportLocalCountersMessage; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.nc.NodeControllerService; @@ -29,8 +30,8 @@ public class ReportLocalCountersTask implements INCLifecycleTask { private static final long serialVersionUID = 1L; @Override - public void perform(IControllerService cs) throws HyracksDataException { - ReportLocalCountersMessage.send((NodeControllerService) cs); + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { + ReportLocalCountersMessage.send(ccId, (NodeControllerService) cs); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java index 7db473e..7ecc669 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; @@ -39,7 +40,7 @@ public class StartLifecycleComponentsTask implements INCLifecycleTask { private static final long serialVersionUID = 1L; @Override - public void perform(IControllerService cs) throws HyracksDataException { + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext(); NCServiceContext serviceCtx = (NCServiceContext) cs.getContext(); MetadataProperties metadataProperties = applicationContext.getMetadataProperties(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java index 7071271..0cfb6b9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java @@ -20,6 +20,7 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -28,7 +29,7 @@ public class StartReplicationServiceTask implements INCLifecycleTask { private static final long serialVersionUID = 1L; @Override - public void perform(IControllerService cs) throws HyracksDataException { + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { // open replication channel http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java index b0e1e06..a8c98c7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java @@ -19,6 +19,7 @@ package org.apache.asterix.app.replication.message; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; @@ -27,7 +28,8 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class MetadataNodeRequestMessage implements INCLifecycleMessage, INcAddressedMessage { +public class MetadataNodeRequestMessage extends CcIdentifiedMessage + implements INCLifecycleMessage, INcAddressedMessage { private static final long serialVersionUID = 1L; private static final Logger LOGGER = LogManager.getLogger(); @@ -55,7 +57,7 @@ public class MetadataNodeRequestMessage implements INCLifecycleMessage, INcAddre MetadataNodeResponseMessage reponse = new MetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId(), export); try { - broker.sendMessageToCC(reponse); + broker.sendMessageToCC(getCcId(), reponse); } catch (Exception e) { LOGGER.log(Level.ERROR, "Failed taking over metadata", e); hde = HyracksDataException.suppress(hde, e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java index 6ca576a..62e7a69 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.hyracks.api.client.NodeStatus; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.logging.log4j.Level; @@ -44,12 +45,12 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc this.nodeStatus = nodeStatus; } - public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState) + public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState) throws HyracksDataException { try { RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState); - ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg); + ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg); } catch (Exception e) { LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e); throw HyracksDataException.create(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java index d4c2340..a6f10ca 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; @@ -33,7 +34,8 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage { +public class RegistrationTasksResponseMessage extends CcIdentifiedMessage + implements INCLifecycleMessage, INcAddressedMessage { private static final Logger LOGGER = LogManager.getLogger(); private static final long serialVersionUID = 1L; @@ -57,7 +59,7 @@ public class RegistrationTasksResponseMessage implements INCLifecycleMessage, IN if (LOGGER.isInfoEnabled()) { LOGGER.log(Level.INFO, "Starting startup task: " + task); } - task.perform(cs); + task.perform(getCcId(), cs); if (LOGGER.isInfoEnabled()) { LOGGER.log(Level.INFO, "Completed startup task: " + task); } @@ -70,7 +72,7 @@ public class RegistrationTasksResponseMessage implements INCLifecycleMessage, IN NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success); result.setException(exception); try { - broker.sendMessageToCC(result); + broker.sendMessageToCC(getCcId(), result); } catch (Exception e) { success = false; LOGGER.log(Level.ERROR, "Failed sending message to cc", e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 4e9cb47..335899c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -37,6 +37,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -2475,7 +2476,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } }, clientContextId, ctx); } catch (Exception e) { - if (JobId.INVALID.equals(jobId.getValue())) { + if (Objects.equals(JobId.INVALID, jobId.getValue())) { // compilation failed ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.FAILED); ResultUtil.printError(sessionOutput.out(), e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 11f4e1c..1c7bfb7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -57,7 +57,6 @@ import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.app.replication.NcLifecycleCoordinator; import org.apache.asterix.common.api.AsterixThreadFactory; -import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.ExternalProperties; @@ -348,14 +347,6 @@ public class CCApplication extends BaseCCApplication { } @Override - public void startupCompleted() throws Exception { - ccServiceCtx.getControllerService().getExecutor().submit(() -> { - appCtx.getClusterStateManager().waitForState(IClusterManagementWork.ClusterState.ACTIVE); - return null; - }); - } - - @Override public IJobCapacityController getJobCapacityController() { return jobCapacityController; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 8c87a26..932f47c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -60,9 +60,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { @Override public void notifyNodeJoin(String nodeId, Map ncConfiguration) throws HyracksException { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("NC: " + nodeId + " joined"); - } + LOGGER.info("NC: {} joined", nodeId); IClusterStateManager csm = appCtx.getClusterStateManager(); csm.notifyNodeJoin(nodeId, ncConfiguration); @@ -79,9 +77,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { @Override public void notifyNodeFailure(Collection deadNodeIds) throws HyracksException { for (String deadNode : deadNodeIds) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("NC: " + deadNode + " left"); - } + LOGGER.info("NC: {} left", deadNode); IClusterStateManager csm = appCtx.getClusterStateManager(); csm.notifyNodeFailure(deadNode); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 0526a32..0d3b7b5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -50,6 +50,7 @@ import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.config.IConfigManager; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; @@ -198,19 +199,23 @@ public class NCApplication extends BaseNCApplication { state = SystemState.BOOTSTRAPPING; } // Request registration tasks from CC - RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), - NodeStatus.BOOTING, state); + // TODO (mblow): multicc + final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService(); + RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryClusterController().getCcId(), + ncControllerService, NodeStatus.BOOTING, state); startupCompleted = true; } @Override - public void onRegisterNode() throws Exception { - if (startupCompleted) { + public void onRegisterNode(CcId ccId) throws Exception { + // TODO (mblow): multicc + if (startupCompleted && ccId.equals(((NodeControllerService) ncServiceCtx.getControllerService()) + .getPrimaryClusterController().getCcId())) { /* * If the node completed its startup before, then this is a re-registration with * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE */ - RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), + RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), NodeStatus.ACTIVE, SystemState.HEALTHY); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index 8e8fb93..80e0b33 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -28,6 +28,7 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.ICcIdentifiedMessage; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.messaging.api.INcResponse; import org.apache.commons.lang3.mutable.MutableInt; @@ -39,7 +40,6 @@ import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.cluster.INodeManager; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -70,6 +70,9 @@ public class CCMessageBroker implements ICCMessageBroker { public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception { INodeManager nodeManager = ccs.getNodeManager(); NodeControllerState state = nodeManager.getNodeControllerState(nodeId); + if (msg instanceof ICcIdentifiedMessage) { + ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId()); + } if (state != null) { state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); } else { @@ -97,6 +100,9 @@ public class CCMessageBroker implements ICCMessageBroker { for (int i = 0; i < ncs.size(); i++) { String nc = ncs.get(i); INcAddressedMessage message = requests.get(i); + if (!(message instanceof ICcIdentifiedMessage)) { + throw new IllegalStateException("sync request message not cc identified: " + message); + } sendApplicationMessageToNC(message, nc); } long time = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java index 08e406e..988c7bb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java @@ -31,6 +31,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.hyracks.api.comm.IChannelControlBlock; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.api.util.JavaSerializationUtils; @@ -68,8 +69,13 @@ public class NCMessageBroker implements INCMessageBroker { } @Override - public void sendMessageToCC(ICcAddressedMessage message) throws Exception { - ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null); + public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception { + ncs.sendApplicationMessageToCC(ccId, JavaSerializationUtils.serialize(message), null); + } + + @Override + public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception { + sendMessageToCC(ncs.getPrimaryClusterController().getCcId(), message); } @Override @@ -145,7 +151,7 @@ public class NCMessageBroker implements INCMessageBroker { */ @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { INcAddressedMessage msg = null; try { msg = receivedMsgsQ.take(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java index 093d150..9612ead 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -54,6 +54,7 @@ import org.apache.asterix.test.base.TestMethodTracer; import org.apache.asterix.translator.IStatementExecutor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobIdFactory; import org.apache.hyracks.api.job.JobStatus; @@ -98,7 +99,7 @@ public class ActiveEventsListenerTest { @Before public void setUp() throws Exception { - jobIdFactory = new JobIdFactory(); + jobIdFactory = new JobIdFactory(CcId.valueOf((short) 0)); handler = new ActiveNotificationHandler(); allDatasets = new ArrayList<>(); firstDataset = new Dataset(dataverseName, "firstDataset", null, null, null, null, null, null, null, null, 0, 0); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java index c30e999..138f0e2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java @@ -20,6 +20,7 @@ package org.apache.asterix.common.api; import java.io.Serializable; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -29,8 +30,9 @@ public interface INCLifecycleTask extends Serializable { /** * Performs the task. * + * @param ccId * @param cs * @throws HyracksDataException */ - void perform(IControllerService cs) throws HyracksDataException; + void perform(CcId ccId, IControllerService cs) throws HyracksDataException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java new file mode 100644 index 0000000..d8a68ef --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging; + +import java.io.Serializable; + +import org.apache.asterix.common.messaging.api.ICcIdentifiedMessage; +import org.apache.hyracks.api.control.CcId; + +public abstract class CcIdentifiedMessage implements ICcIdentifiedMessage, Serializable { + private CcId ccId; + + @Override + public CcId getCcId() { + return ccId; + } + + @Override + public void setCcId(CcId ccId) { + this.ccId = ccId; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java index 33a8ff3..208686c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.hyracks.api.messages.IMessageBroker; public interface ICCMessageBroker extends IMessageBroker { - public enum ResponseState { + enum ResponseState { UNINITIALIZED, SUCCESS, FAILURE http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java index 37cba9c..549f1dd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java @@ -29,5 +29,4 @@ public interface ICcAddressedMessage extends IMessage { * handle the message upon delivery */ void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException; - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java new file mode 100644 index 0000000..d144498 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging.api; + +import org.apache.hyracks.api.control.CcId; + +public interface ICcIdentifiedMessage { + CcId getCcId(); + + void setCcId(CcId ccId); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java index 86d1074..6ec2d7e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java @@ -18,17 +18,26 @@ */ package org.apache.asterix.common.messaging.api; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.messages.IMessageBroker; public interface INCMessageBroker extends IMessageBroker { /** + * Sends application message from this NC to the primary CC. + * + * @param message + * @throws Exception + */ + public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception; + + /** * Sends application message from this NC to the CC. * * @param message * @throws Exception */ - public void sendMessageToCC(ICcAddressedMessage message) throws Exception; + public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception; /** * Sends application message from this NC to another NC. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java deleted file mode 100644 index 9bdf55c..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.utils; - -import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class InvokeUtil { - - private static final Logger LOGGER = LogManager.getLogger(); - - private InvokeUtil() { - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible - * completes, the current thread will be re-interrupted, if the original operation was interrupted. - */ - public static void doUninterruptibly(Interruptible interruptible) { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible - * completes, the current thread will be re-interrupted, if the original operation was interrupted. - */ - public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. - * - * @return true if the original operation was interrupted, otherwise false - */ - public static boolean doUninterruptiblyGet(Interruptible interruptible) { - boolean interrupted = false; - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- contract states caller must handle - interrupted = true; - } - } - return interrupted; - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an - * exception after being previously interrupted, the current thread will be re-interrupted. - * - * @return true if the original operation was interrupted, otherwise false - */ - public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception { - boolean interrupted = false; - boolean success = false; - while (true) { - try { - interruptible.run(); - success = true; - break; - } catch (InterruptedException e) { // NOSONAR- contract states caller must handle - interrupted = true; - } finally { - if (!success && interrupted) { - Thread.currentThread().interrupt(); - } - } - } - return interrupted; - } - - public static boolean retryLoop(long duration, TimeUnit durationUnit, long delay, TimeUnit delayUnit, - Callable function) throws IOException { - long endTime = System.nanoTime() + durationUnit.toNanos(duration); - boolean first = true; - while (endTime - System.nanoTime() > 0) { - if (first) { - first = false; - } else { - try { - delayUnit.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - try { - if (function.call()) { - return true; - } - } catch (Exception e) { - // ignore, retry after delay - LOGGER.log(Level.DEBUG, "Ignoring exception on retryLoop attempt, will retry after delay", e); - } - } - return false; - } - - /** - * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException} or - * {@link InterruptedException}. Once the interruptible completes, the current thread will be re-interrupted, if - * the original operation was interrupted. - */ - public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws IOException { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (ClosedByInterruptException | InterruptedException e) { - LOGGER.error("IO operation Interrupted. Retrying..", e); - interrupted = true; - Thread.interrupted(); - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - @FunctionalInterface - public interface Interruptible { - void run() throws InterruptedException; - } - - @FunctionalInterface - public interface ThrowingInterruptible { - void run() throws Exception; // NOSONAR - } - - @FunctionalInterface - public interface ThrowingIOInterruptible { - void run() throws IOException, InterruptedException; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java index 1988f0a..b35d02a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java @@ -24,9 +24,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.metadata.IMetadataLock; -import org.apache.asterix.common.utils.InvokeUtil; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.util.InvokeUtil; public class DatasetLock implements IMetadataLock { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java index 0b321a2..4b95253 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.logging.log4j.Level; @@ -51,8 +52,7 @@ public class ReportLocalCountersMessage implements ICcAddressedMessage { resourceIdManager.report(src, maxResourceId); } - public static void send(NodeControllerService cs) throws HyracksDataException { - NodeControllerService ncs = cs; + public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); @@ -60,7 +60,7 @@ public class ReportLocalCountersMessage implements ICcAddressedMessage { ReportLocalCountersMessage countersMessage = new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId); try { - ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage); + ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage); } catch (Exception e) { LOGGER.log(Level.ERROR, "Unable to report local counters", e); throw HyracksDataException.create(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java index 785ad2f..51f53e7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java @@ -19,16 +19,18 @@ package org.apache.asterix.runtime.message; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.NodeControllerService; -public class ReportLocalCountersRequestMessage implements INcAddressedMessage { +public class ReportLocalCountersRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage { private static final long serialVersionUID = 1L; @Override public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService()); + ReportLocalCountersMessage.send(getCcId(), + (NodeControllerService) appCtx.getServiceContext().getControllerService()); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java index b7a4c14..78b1f17 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java @@ -62,7 +62,7 @@ public class GlobalResourceIdFactory implements IResourceIdFactory { //if no response available or it has an exception, request a new one if (reponse == null || reponse.getException() != null) { ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId); - ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToCC(msg); + ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg); reponse = resourceIdResponseQ.take(); if (reponse.getException() != null) { throw HyracksDataException.create(reponse.getException()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-transactions/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml index 22ef244..80fbd6c 100644 --- a/asterixdb/asterix-transactions/pom.xml +++ b/asterixdb/asterix-transactions/pom.xml @@ -155,5 +155,9 @@ org.apache.logging.log4j log4j-api + + org.apache.hyracks + hyracks-util + http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 00c6c13..840a19b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -54,8 +54,8 @@ import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.MutableLong; import org.apache.asterix.common.transactions.TxnLogFile; -import org.apache.asterix.common.utils.InvokeUtil; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.util.InvokeUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index 53cd038..0bb8293 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -28,7 +28,7 @@ import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.utils.InvokeUtil; +import org.apache.hyracks.util.InvokeUtil; public class LogManagerWithReplication extends LogManager { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java index 64f4e29..af6cb92 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.api.application; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; @@ -33,5 +34,5 @@ public interface INCApplication extends IApplication { */ IFileDeviceResolver getFileDeviceResolver(); - void onRegisterNode() throws Exception; + void onRegisterNode(CcId ccId) throws Exception; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java index 80ff77c..d42cbb3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java @@ -67,6 +67,10 @@ public interface IApplicationConfig { return (int)get(option); } + default short getShort(IOption option) { + return (short)get(option); + } + default String getString(IOption option) { return (String)get(option); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java new file mode 100644 index 0000000..32782fd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.control; + +import java.io.Serializable; + +public class CcId implements Serializable { + + private static final long serialVersionUID = 1L; + + private short id; + + private CcId(short id) { + this.id = id; + } + + public static CcId valueOf(String ccIdString) { + return new CcId(Integer.decode(ccIdString).shortValue()); + } + + public static CcId valueOf(int ccId) { + if ((ccId & ~0xffff) != 0) { + throw new IllegalArgumentException("ccId cannot exceed 16-bits: " + Integer.toHexString(ccId)); + } + return new CcId((short) ccId); + } + + public short shortValue() { + return id; + } + + @Override + public int hashCode() { + return id; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof CcId && id == ((CcId) obj).id; + } + + @Override + public String toString() { + return "CC:" + Integer.toHexString(((int) id) & 0xffff); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index b6d4f6b..35fdb2e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -126,7 +126,7 @@ public class ErrorCode { public static final int ILLEGAL_MEMORY_BUDGET = 90; public static final int TIMEOUT = 91; public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92; - public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93; + // 93 public static final int CANNOT_READ_CLOSED_FILE = 94; public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95; public static final int ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT = 96; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java index 47da24a..c83366f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java @@ -23,16 +23,22 @@ import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IWritable; -public final class JobId implements IWritable, Serializable { +public final class JobId implements IWritable, Serializable, Comparable { - public static final JobId INVALID = new JobId(-1l); + private static final int CC_BITS = Short.SIZE; + static final int ID_BITS = Long.SIZE - CC_BITS; + static final long MAX_ID = (1L << ID_BITS) - 1; + + public static final JobId INVALID = null; private static final long serialVersionUID = 1L; private long id; + private transient CcId ccId; public static JobId create(DataInput dis) throws IOException { JobId jobId = new JobId(); @@ -51,6 +57,17 @@ public final class JobId implements IWritable, Serializable { return id; } + public CcId getCcId() { + if (ccId == null) { + ccId = CcId.valueOf((int) (id >>> ID_BITS)); + } + return ccId; + } + + public long getIdOnly() { + return id & MAX_ID; + } + @Override public int hashCode() { return (int) id; @@ -58,13 +75,7 @@ public final class JobId implements IWritable, Serializable { @Override public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof JobId)) { - return false; - } - return ((JobId) o).id == id; + return o == this || o instanceof JobId && ((JobId) o).id == id; } @Override @@ -89,4 +100,9 @@ public final class JobId implements IWritable, Serializable { public void readFields(DataInput input) throws IOException { id = input.readLong(); } + + @Override + public int compareTo(Object other) { + return Long.compare(id, ((JobId) other).id); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java index eea6b52..1bb5749 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java @@ -18,20 +18,36 @@ */ package org.apache.hyracks.api.job; +import static org.apache.hyracks.api.job.JobId.ID_BITS; +import static org.apache.hyracks.api.job.JobId.MAX_ID; + import java.util.concurrent.atomic.AtomicLong; +import org.apache.hyracks.api.control.CcId; + public class JobIdFactory { - private final AtomicLong id = new AtomicLong(0); + private final AtomicLong id; - public JobId create() { - return new JobId(id.getAndIncrement()); + public JobIdFactory(CcId ccId) { + id = new AtomicLong((long) ccId.shortValue() << ID_BITS); } - public long maxJobId() { - return id.get(); + public JobId create() { + return new JobId(id.getAndUpdate(prev -> { + if ((prev & MAX_ID) == MAX_ID) { + return prev ^ MAX_ID; + } else { + return prev + 1; + } + })); } - public void ensureMinimumId(long id) { - this.id.updateAndGet(current -> Math.max(current, id)); + public JobId maxJobId() { + long next = id.get(); + if ((next & MAX_ID) == 0) { + return new JobId(next | MAX_ID); + } else { + return new JobId(next - 1); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 6254b86..465a661 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -109,7 +109,7 @@ 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes) 91 = Operation timed out 92 = Job %1$s has been cleared from job history -93 = Job %1$s has not been created yet +# 93 94 = Cannot read closed file (%1$s) 95 = Tuple of size %1$s cannot fit into an empty frame 96 = Illegal attempt to enter empty component http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java new file mode 100644 index 0000000..d16eb15 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hyracks.api.control.CcId; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class JobIdFactoryTest { + + private static Field idField; + + @BeforeClass + public static void setup() throws NoSuchFieldException { + idField = JobIdFactory.class.getDeclaredField("id"); + idField.setAccessible(true); + } + + @Test + public void testCcIds() { + JobIdFactory factory = new JobIdFactory(CcId.valueOf(0)); + for (int i = 0; i < 1000; i++) { + final JobId jobId = factory.create(); + Assert.assertEquals(0, jobId.getCcId().shortValue()); + Assert.assertEquals(i, jobId.getIdOnly()); + } + } + + @Test + public void testNegativeCcId() { + JobIdFactory factory = new JobIdFactory(CcId.valueOf(0xFFFF)); + for (int i = 0; i < 1000; i++) { + final JobId jobId = factory.create(); + Assert.assertEquals((short) 0xFFFF, jobId.getCcId().shortValue()); + Assert.assertEquals(i, jobId.getIdOnly()); + Assert.assertTrue("JID not negative", jobId.getId() < 0); + Assert.assertEquals(0xFFFF000000000000L + i, jobId.getId()); + } + } + + @Test + public void testOverflow() throws IllegalAccessException { + testOverflow(0); + testOverflow(0xFFFF); + testOverflow(Short.MAX_VALUE); + } + + private void testOverflow(int id) throws IllegalAccessException { + CcId ccId = CcId.valueOf(id); + long expected = (long) id << 48; + JobIdFactory factory = new JobIdFactory(ccId); + AtomicLong theId = (AtomicLong) idField.get(factory); + Assert.assertEquals(expected, theId.get()); + theId.set((((long)1 << 48) - 1) | expected); + JobId jobId = factory.create(); + Assert.assertEquals(ccId, jobId.getCcId()); + Assert.assertEquals(JobId.MAX_ID, jobId.getIdOnly()); + jobId = factory.create(); + Assert.assertEquals(ccId, jobId.getCcId()); + Assert.assertEquals(0, jobId.getIdOnly()); + } + + @Test + public void testComparability() throws IllegalAccessException { + JobIdFactory factory = new JobIdFactory(CcId.valueOf(0)); + compareLoop(factory, false); + factory = new JobIdFactory(CcId.valueOf(0xFFFF)); + compareLoop(factory, false); + AtomicLong theId = (AtomicLong) idField.get(factory); + theId.set(0xFFFFFFFFFFFFFFF0L); + compareLoop(factory, true); + } + + private void compareLoop(JobIdFactory factory, boolean overflow) { + Set overflowed = new HashSet<>(Collections.singleton(false)); + JobId prevMax = null; + for (int i = 0; i < 1000; i++) { + final JobId jobId = factory.create(); + Assert.assertTrue("max == last", factory.maxJobId().compareTo(jobId) == 0); + if (i > 0) { + Assert.assertTrue("last > previous max", prevMax.compareTo(jobId) < 0 || overflowed.add(overflow)); + } + prevMax = factory.maxJobId(); + } + } + + @Test + public void testTooLarge() { + try { + CcId.valueOf(0x10000); + Assert.assertTrue("expected exception", false); + } catch (IllegalArgumentException e) { + } + } +} \ No newline at end of file