asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mb...@apache.org
Subject [3/3] asterixdb git commit: [ASTERIXDB-2110] Introduce Cluster Controller Id
Date Sat, 27 Jan 2018 16:09:52 GMT
[ASTERIXDB-2110] Introduce Cluster Controller Id

Change-Id: Iec1b01444bfbd923e38f5c162c5244e17c4d5f03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2323
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1a3a8212
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1a3a8212
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1a3a8212

Branch: refs/heads/master
Commit: 1a3a82123d197c074000ab428f3aabc4c3cbfc19
Parents: 65b8070
Author: Michael Blow <michael.blow@couchbase.com>
Authored: Fri Jan 26 23:24:55 2018 -0500
Committer: Michael Blow <mblow@apache.org>
Committed: Sat Jan 27 08:09:19 2018 -0800

----------------------------------------------------------------------
 .../apache/asterix/active/ActiveManager.java    |   6 +-
 .../active/message/ActiveManagerMessage.java    |   3 +-
 .../http/server/NCQueryCancellationServlet.java |   3 +-
 .../api/http/server/NCQueryServiceServlet.java  |   5 +-
 .../app/nc/task/BindMetadataNodeTask.java       |   3 +-
 .../asterix/app/nc/task/CheckpointTask.java     |   3 +-
 .../app/nc/task/ExternalLibrarySetupTask.java   |   3 +-
 .../asterix/app/nc/task/LocalRecoveryTask.java  |   3 +-
 .../app/nc/task/MetadataBootstrapTask.java      |   3 +-
 .../app/nc/task/ReportLocalCountersTask.java    |   5 +-
 .../nc/task/StartLifecycleComponentsTask.java   |   3 +-
 .../nc/task/StartReplicationServiceTask.java    |   3 +-
 .../message/MetadataNodeRequestMessage.java     |   6 +-
 .../RegistrationTasksRequestMessage.java        |   5 +-
 .../RegistrationTasksResponseMessage.java       |   8 +-
 .../asterix/app/translator/QueryTranslator.java |   3 +-
 .../hyracks/bootstrap/CCApplication.java        |   9 -
 .../bootstrap/ClusterLifecycleListener.java     |   8 +-
 .../hyracks/bootstrap/NCApplication.java        |  15 +-
 .../asterix/messaging/CCMessageBroker.java      |   8 +-
 .../asterix/messaging/NCMessageBroker.java      |  12 +-
 .../test/active/ActiveEventsListenerTest.java   |   3 +-
 .../asterix/common/api/INCLifecycleTask.java    |   4 +-
 .../common/messaging/CcIdentifiedMessage.java   |  38 ++++
 .../common/messaging/api/ICCMessageBroker.java  |   2 +-
 .../messaging/api/ICcAddressedMessage.java      |   1 -
 .../messaging/api/ICcIdentifiedMessage.java     |  27 +++
 .../common/messaging/api/INCMessageBroker.java  |  11 +-
 .../apache/asterix/common/utils/InvokeUtil.java | 190 ----------------
 .../asterix/metadata/lock/DatasetLock.java      |   2 +-
 .../message/ReportLocalCountersMessage.java     |   6 +-
 .../ReportLocalCountersRequestMessage.java      |   6 +-
 .../transaction/GlobalResourceIdFactory.java    |   2 +-
 asterixdb/asterix-transactions/pom.xml          |   4 +
 .../management/service/logging/LogManager.java  |   2 +-
 .../logging/LogManagerWithReplication.java      |   2 +-
 .../hyracks/api/application/INCApplication.java |   3 +-
 .../hyracks/api/config/IApplicationConfig.java  |   4 +
 .../org/apache/hyracks/api/control/CcId.java    |  62 ++++++
 .../hyracks/api/exceptions/ErrorCode.java       |   2 +-
 .../java/org/apache/hyracks/api/job/JobId.java  |  34 ++-
 .../apache/hyracks/api/job/JobIdFactory.java    |  30 ++-
 .../src/main/resources/errormsg/en.properties   |   2 +-
 .../hyracks/api/job/JobIdFactoryTest.java       | 118 ++++++++++
 .../control/cc/ClusterControllerService.java    |  59 +++--
 .../hyracks/control/cc/cluster/NodeManager.java |   2 +-
 .../control/cc/work/RegisterNodeWork.java       |   3 +-
 .../cc/work/WaitForJobCompletionWork.java       |   8 +-
 .../control/common/base/IClusterController.java |  39 ++--
 .../control/common/base/INodeController.java    |  28 +--
 .../control/common/config/OptionTypes.java      |  21 ++
 .../control/common/controllers/CCConfig.java    |  11 +-
 .../control/common/controllers/NCConfig.java    |   9 +
 .../control/common/ipc/CCNCFunctions.java       |  63 ++++--
 .../ipc/ClusterControllerRemoteProxy.java       |  15 +-
 .../common/ipc/NodeControllerRemoteProxy.java   |  19 +-
 .../hyracks/control/nc/BaseNCApplication.java   |   3 +-
 .../org/apache/hyracks/control/nc/Joblet.java   |  14 +-
 .../hyracks/control/nc/NodeControllerIPCI.java  |  17 +-
 .../control/nc/NodeControllerService.java       | 215 +++++++++++++------
 .../org/apache/hyracks/control/nc/Task.java     |   5 +-
 .../nc/dataset/DatasetPartitionManager.java     |  10 +-
 .../partitions/MaterializedPartitionWriter.java |   2 +-
 .../MaterializingPipelinedPartition.java        |   3 +-
 .../control/nc/partitions/PartitionManager.java |  28 ++-
 .../nc/partitions/PipelinedPartition.java       |   3 +-
 .../hyracks/control/nc/task/ThreadDumpTask.java |  10 +-
 .../control/nc/work/AbortAllJobsWork.java       |  20 +-
 .../control/nc/work/BuildJobProfilesWork.java   |  12 +-
 .../control/nc/work/DeployBinaryWork.java       |   7 +-
 .../control/nc/work/DeployJobSpecWork.java      |   8 +-
 .../control/nc/work/NotifyTaskCompleteWork.java |   4 +-
 .../control/nc/work/NotifyTaskFailureWork.java  |   2 +-
 .../hyracks/control/nc/work/StateDumpWork.java  |   8 +-
 .../control/nc/work/UnDeployBinaryWork.java     |   7 +-
 .../control/nc/work/UndeployJobSpecWork.java    |   7 +-
 .../btree/helper/TestNCApplication.java         |   3 +-
 .../tests/integration/JobFailureTest.java       |   6 -
 .../hyracks/test/support/TestJobletContext.java |   1 -
 .../org/apache/hyracks/util/InvokeUtil.java     | 190 ++++++++++++++++
 .../org/apache/hyracks/util/trace/Tracer.java   |   7 +-
 81 files changed, 1020 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index bfa648a..aa9ac98 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.active.message.ActiveStatsResponse;
 import org.apache.asterix.active.message.ActiveStatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsResponse;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -116,7 +116,7 @@ public class ActiveManager {
                 LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId);
                 // Send a failure message
                 ((NodeControllerService) serviceCtx.getControllerService())
-                        .sendApplicationMessageToCC(
+                        .sendApplicationMessageToCC(message.getCcId(),
                                 JavaSerializationUtils
                                         .serialize(new ActiveStatsResponse(reqId, null,
                                                 new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
@@ -126,7 +126,7 @@ public class ActiveManager {
             String stats = runtime.getStats();
             ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
             ((NodeControllerService) serviceCtx.getControllerService())
-                    .sendApplicationMessageToCC(JavaSerializationUtils.serialize(response), null);
+                    .sendApplicationMessageToCC(message.getCcId(), JavaSerializationUtils.serialize(response), null);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index bef418b..b8c44a6 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -22,10 +22,11 @@ import java.io.Serializable;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class ActiveManagerMessage implements INcAddressedMessage {
+public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddressedMessage {
     public enum Kind {
         STOP_ACTIVITY,
         REQUEST_STATS

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 87beae1..c3e02af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -62,7 +62,8 @@ public class NCQueryCancellationServlet extends QueryCancellationServlet {
         try {
             CancelQueryRequest cancelQueryMessage =
                     new CancelQueryRequest(serviceCtx.getNodeId(), cancelQueryFuture.getFutureId(), clientContextId);
-            messageBroker.sendMessageToCC(cancelQueryMessage);
+            // TODO(mblow): multicc -- need to send cancellation to the correct cc
+            messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
             cancelQueryFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
             response.setStatus(HttpResponseStatus.OK);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 76f489c..5cbac64 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -88,7 +88,7 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
                     responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(),
                     resultProperties.getNcToCcResultProperties(), param.clientContextID, handleUrl, optionalParameters);
             execution.start();
-            ncMb.sendMessageToCC(requestMsg);
+            ncMb.sendMessageToPrimaryCC(requestMsg);
             try {
                 responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
@@ -137,7 +137,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
         try {
             CancelQueryRequest cancelQueryMessage =
                     new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
-            messageBroker.sendMessageToCC(cancelQueryMessage);
+            // TODO(mblow): multicc -- need to send cancellation to the correct cc
+            messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
             if (wait) {
                 cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
                         TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
index 49a84e1..e41bc60 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -20,6 +20,7 @@ package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -33,7 +34,7 @@ public class BindMetadataNodeTask implements INCLifecycleTask {
     }
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
             if (exportStub) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
index 02c377a..6f1775e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
@@ -21,6 +21,7 @@ package org.apache.asterix.app.nc.task;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -29,7 +30,7 @@ public class CheckpointTask implements INCLifecycleTask {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         ICheckpointManager checkpointMgr = appContext.getTransactionSubsystem().getCheckpointManager();
         checkpointMgr.doSharpCheckpoint();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
index 4e330c6..8cfeb12 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -21,6 +21,7 @@ package org.apache.asterix.app.nc.task;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -34,7 +35,7 @@ public class ExternalLibrarySetupTask implements INCLifecycleTask {
     }
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
             ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
index eb19ad6..d0a8dcc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -38,7 +39,7 @@ public class LocalRecoveryTask implements INCLifecycleTask {
     }
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
             appContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index 784f3b0..001af23 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -21,6 +21,7 @@ package org.apache.asterix.app.nc.task;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -29,7 +30,7 @@ public class MetadataBootstrapTask implements INCLifecycleTask {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
             SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
index 86f7d1c..53b13e8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
@@ -20,6 +20,7 @@ package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.runtime.message.ReportLocalCountersMessage;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -29,8 +30,8 @@ public class ReportLocalCountersTask implements INCLifecycleTask {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
-        ReportLocalCountersMessage.send((NodeControllerService) cs);
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
+        ReportLocalCountersMessage.send(ccId, (NodeControllerService) cs);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
index 7db473e..7ecc669 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
@@ -25,6 +25,7 @@ import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
@@ -39,7 +40,7 @@ public class StartLifecycleComponentsTask implements INCLifecycleTask {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext();
         NCServiceContext serviceCtx = (NCServiceContext) cs.getContext();
         MetadataProperties metadataProperties = applicationContext.getMetadataProperties();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
index 7071271..0cfb6b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -20,6 +20,7 @@ package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -28,7 +29,7 @@ public class StartReplicationServiceTask implements INCLifecycleTask {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
             // open replication channel

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
index b0e1e06..a8c98c7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.replication.message;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -27,7 +28,8 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public class MetadataNodeRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class MetadataNodeRequestMessage extends CcIdentifiedMessage
+        implements INCLifecycleMessage, INcAddressedMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = LogManager.getLogger();
@@ -55,7 +57,7 @@ public class MetadataNodeRequestMessage implements INCLifecycleMessage, INcAddre
             MetadataNodeResponseMessage reponse =
                     new MetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId(), export);
             try {
-                broker.sendMessageToCC(reponse);
+                broker.sendMessageToCC(getCcId(), reponse);
             } catch (Exception e) {
                 LOGGER.log(Level.ERROR, "Failed taking over metadata", e);
                 hde = HyracksDataException.suppress(hde, e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index 6ca576a..62e7a69 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -24,6 +24,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.logging.log4j.Level;
@@ -44,12 +45,12 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc
         this.nodeStatus = nodeStatus;
     }
 
-    public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
+    public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
             throws HyracksDataException {
         try {
             RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
                     systemState);
-            ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg);
+            ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
             throw HyracksDataException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index d4c2340..a6f10ca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -33,7 +34,8 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class RegistrationTasksResponseMessage extends CcIdentifiedMessage
+        implements INCLifecycleMessage, INcAddressedMessage {
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final long serialVersionUID = 1L;
@@ -57,7 +59,7 @@ public class RegistrationTasksResponseMessage implements INCLifecycleMessage, IN
                     if (LOGGER.isInfoEnabled()) {
                         LOGGER.log(Level.INFO, "Starting startup task: " + task);
                     }
-                    task.perform(cs);
+                    task.perform(getCcId(), cs);
                     if (LOGGER.isInfoEnabled()) {
                         LOGGER.log(Level.INFO, "Completed startup task: " + task);
                     }
@@ -70,7 +72,7 @@ public class RegistrationTasksResponseMessage implements INCLifecycleMessage, IN
             NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success);
             result.setException(exception);
             try {
-                broker.sendMessageToCC(result);
+                broker.sendMessageToCC(getCcId(), result);
             } catch (Exception e) {
                 success = false;
                 LOGGER.log(Level.ERROR, "Failed sending message to cc", e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 4e9cb47..335899c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -2475,7 +2476,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 }
             }, clientContextId, ctx);
         } catch (Exception e) {
-            if (JobId.INVALID.equals(jobId.getValue())) {
+            if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
                 ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.FAILED);
                 ResultUtil.printError(sessionOutput.out(), e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 11f4e1c..1c7bfb7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -57,7 +57,6 @@ import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.replication.NcLifecycleCoordinator;
 import org.apache.asterix.common.api.AsterixThreadFactory;
-import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ExternalProperties;
@@ -348,14 +347,6 @@ public class CCApplication extends BaseCCApplication {
     }
 
     @Override
-    public void startupCompleted() throws Exception {
-        ccServiceCtx.getControllerService().getExecutor().submit(() -> {
-            appCtx.getClusterStateManager().waitForState(IClusterManagementWork.ClusterState.ACTIVE);
-            return null;
-        });
-    }
-
-    @Override
     public IJobCapacityController getJobCapacityController() {
         return jobCapacityController;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 8c87a26..932f47c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -60,9 +60,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
 
     @Override
     public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("NC: " + nodeId + " joined");
-        }
+        LOGGER.info("NC: {} joined", nodeId);
         IClusterStateManager csm = appCtx.getClusterStateManager();
         csm.notifyNodeJoin(nodeId, ncConfiguration);
 
@@ -79,9 +77,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
     @Override
     public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
         for (String deadNode : deadNodeIds) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("NC: " + deadNode + " left");
-            }
+            LOGGER.info("NC: {} left", deadNode);
             IClusterStateManager csm = appCtx.getClusterStateManager();
             csm.notifyNodeFailure(deadNode);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0526a32..0d3b7b5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -50,6 +50,7 @@ import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -198,19 +199,23 @@ public class NCApplication extends BaseNCApplication {
             state = SystemState.BOOTSTRAPPING;
         }
         // Request registration tasks from CC
-        RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
-                NodeStatus.BOOTING, state);
+        // TODO (mblow): multicc
+        final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService();
+        RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryClusterController().getCcId(),
+                ncControllerService, NodeStatus.BOOTING, state);
         startupCompleted = true;
     }
 
     @Override
-    public void onRegisterNode() throws Exception {
-        if (startupCompleted) {
+    public void onRegisterNode(CcId ccId) throws Exception {
+        // TODO (mblow): multicc
+        if (startupCompleted && ccId.equals(((NodeControllerService) ncServiceCtx.getControllerService())
+                .getPrimaryClusterController().getCcId())) {
             /*
              * If the node completed its startup before, then this is a re-registration with
              * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE
              */
-            RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+            RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
                     NodeStatus.ACTIVE, SystemState.HEALTHY);
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 8e8fb93..80e0b33 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -28,6 +28,7 @@ import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.ICcIdentifiedMessage;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.INcResponse;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -39,7 +40,6 @@ import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -70,6 +70,9 @@ public class CCMessageBroker implements ICCMessageBroker {
     public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
+        if (msg instanceof ICcIdentifiedMessage) {
+            ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId());
+        }
         if (state != null) {
             state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
         } else {
@@ -97,6 +100,9 @@ public class CCMessageBroker implements ICCMessageBroker {
                 for (int i = 0; i < ncs.size(); i++) {
                     String nc = ncs.get(i);
                     INcAddressedMessage message = requests.get(i);
+                    if (!(message instanceof ICcIdentifiedMessage)) {
+                        throw new IllegalStateException("sync request message not cc identified: " + message);
+                    }
                     sendApplicationMessageToNC(message, nc);
                 }
                 long time = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 08e406e..988c7bb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -31,6 +31,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -68,8 +69,13 @@ public class NCMessageBroker implements INCMessageBroker {
     }
 
     @Override
-    public void sendMessageToCC(ICcAddressedMessage message) throws Exception {
-        ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
+    public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception {
+        ncs.sendApplicationMessageToCC(ccId, JavaSerializationUtils.serialize(message), null);
+    }
+
+    @Override
+    public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception {
+        sendMessageToCC(ncs.getPrimaryClusterController().getCcId(), message);
     }
 
     @Override
@@ -145,7 +151,7 @@ public class NCMessageBroker implements INCMessageBroker {
          */
         @Override
         public void run() {
-            while (true) {
+            while (!Thread.currentThread().isInterrupted()) {
                 INcAddressedMessage msg = null;
                 try {
                     msg = receivedMsgsQ.take();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 093d150..9612ead 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -54,6 +54,7 @@ import org.apache.asterix.test.base.TestMethodTracer;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobStatus;
@@ -98,7 +99,7 @@ public class ActiveEventsListenerTest {
 
     @Before
     public void setUp() throws Exception {
-        jobIdFactory = new JobIdFactory();
+        jobIdFactory = new JobIdFactory(CcId.valueOf((short) 0));
         handler = new ActiveNotificationHandler();
         allDatasets = new ArrayList<>();
         firstDataset = new Dataset(dataverseName, "firstDataset", null, null, null, null, null, null, null, null, 0, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
index c30e999..138f0e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
@@ -20,6 +20,7 @@ package org.apache.asterix.common.api;
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -29,8 +30,9 @@ public interface INCLifecycleTask extends Serializable {
     /**
      * Performs the task.
      *
+     * @param ccId
      * @param cs
      * @throws HyracksDataException
      */
-    void perform(IControllerService cs) throws HyracksDataException;
+    void perform(CcId ccId, IControllerService cs) throws HyracksDataException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java
new file mode 100644
index 0000000..d8a68ef
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.io.Serializable;
+
+import org.apache.asterix.common.messaging.api.ICcIdentifiedMessage;
+import org.apache.hyracks.api.control.CcId;
+
+public abstract class CcIdentifiedMessage implements ICcIdentifiedMessage, Serializable {
+    private CcId ccId;
+
+    @Override
+    public CcId getCcId() {
+        return ccId;
+    }
+
+    @Override
+    public void setCcId(CcId ccId) {
+        this.ccId = ccId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 33a8ff3..208686c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface ICCMessageBroker extends IMessageBroker {
-    public enum ResponseState {
+    enum ResponseState {
         UNINITIALIZED,
         SUCCESS,
         FAILURE

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java
index 37cba9c..549f1dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java
@@ -29,5 +29,4 @@ public interface ICcAddressedMessage extends IMessage {
      * handle the message upon delivery
      */
     void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException;
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java
new file mode 100644
index 0000000..d144498
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.control.CcId;
+
+public interface ICcIdentifiedMessage {
+    CcId getCcId();
+
+    void setCcId(CcId ccId);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 86d1074..6ec2d7e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -18,17 +18,26 @@
  */
 package org.apache.asterix.common.messaging.api;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface INCMessageBroker extends IMessageBroker {
 
     /**
+     * Sends application message from this NC to the primary CC.
+     *
+     * @param message
+     * @throws Exception
+     */
+    public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
+
+    /**
      * Sends application message from this NC to the CC.
      *
      * @param message
      * @throws Exception
      */
-    public void sendMessageToCC(ICcAddressedMessage message) throws Exception;
+    public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
 
     /**
      * Sends application message from this NC to another NC.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
deleted file mode 100644
index 9bdf55c..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.utils;
-
-import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class InvokeUtil {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private InvokeUtil() {
-    }
-
-    /**
-     * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
-     * completes, the current thread will be re-interrupted, if the original operation was interrupted.
-     */
-    public static void doUninterruptibly(Interruptible interruptible) {
-        boolean interrupted = false;
-        try {
-            while (true) {
-                try {
-                    interruptible.run();
-                    break;
-                } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind
-                    interrupted = true;
-                }
-            }
-        } finally {
-            if (interrupted) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    /**
-     * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
-     * completes, the current thread will be re-interrupted, if the original operation was interrupted.
-     */
-    public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception {
-        boolean interrupted = false;
-        try {
-            while (true) {
-                try {
-                    interruptible.run();
-                    break;
-                } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind
-                    interrupted = true;
-                }
-            }
-        } finally {
-            if (interrupted) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    /**
-     * Executes the passed interruptible, retrying if the operation is interrupted.
-     *
-     * @return true if the original operation was interrupted, otherwise false
-     */
-    public static boolean doUninterruptiblyGet(Interruptible interruptible) {
-        boolean interrupted = false;
-        while (true) {
-            try {
-                interruptible.run();
-                break;
-            } catch (InterruptedException e) { // NOSONAR- contract states caller must handle
-                interrupted = true;
-            }
-        }
-        return interrupted;
-    }
-
-    /**
-     * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an
-     * exception after being previously interrupted, the current thread will be re-interrupted.
-     *
-     * @return true if the original operation was interrupted, otherwise false
-     */
-    public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception {
-        boolean interrupted = false;
-        boolean success = false;
-        while (true) {
-            try {
-                interruptible.run();
-                success = true;
-                break;
-            } catch (InterruptedException e) { // NOSONAR- contract states caller must handle
-                interrupted = true;
-            } finally {
-                if (!success && interrupted) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-        return interrupted;
-    }
-
-    public static boolean retryLoop(long duration, TimeUnit durationUnit, long delay, TimeUnit delayUnit,
-            Callable<Boolean> function) throws IOException {
-        long endTime = System.nanoTime() + durationUnit.toNanos(duration);
-        boolean first = true;
-        while (endTime - System.nanoTime() > 0) {
-            if (first) {
-                first = false;
-            } else {
-                try {
-                    delayUnit.sleep(delay);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    return false;
-                }
-            }
-            try {
-                if (function.call()) {
-                    return true;
-                }
-            } catch (Exception e) {
-                // ignore, retry after delay
-                LOGGER.log(Level.DEBUG, "Ignoring exception on retryLoop attempt, will retry after delay", e);
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException} or
-     * {@link InterruptedException}. Once the interruptible completes, the current thread will be re-interrupted, if
-     * the original operation was interrupted.
-     */
-    public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws IOException {
-        boolean interrupted = false;
-        try {
-            while (true) {
-                try {
-                    interruptible.run();
-                    break;
-                } catch (ClosedByInterruptException | InterruptedException e) {
-                    LOGGER.error("IO operation Interrupted. Retrying..", e);
-                    interrupted = true;
-                    Thread.interrupted();
-                }
-            }
-        } finally {
-            if (interrupted) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    @FunctionalInterface
-    public interface Interruptible {
-        void run() throws InterruptedException;
-    }
-
-    @FunctionalInterface
-    public interface ThrowingInterruptible {
-        void run() throws Exception; // NOSONAR
-    }
-
-    @FunctionalInterface
-    public interface ThrowingIOInterruptible {
-        void run() throws IOException, InterruptedException;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index 1988f0a..b35d02a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -24,9 +24,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.metadata.IMetadataLock;
-import org.apache.asterix.common.utils.InvokeUtil;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.util.InvokeUtil;
 
 public class DatasetLock implements IMetadataLock {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index 0b321a2..4b95253 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -25,6 +25,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.logging.log4j.Level;
@@ -51,8 +52,7 @@ public class ReportLocalCountersMessage implements ICcAddressedMessage {
         resourceIdManager.report(src, maxResourceId);
     }
 
-    public static void send(NodeControllerService cs) throws HyracksDataException {
-        NodeControllerService ncs = cs;
+    public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
@@ -60,7 +60,7 @@ public class ReportLocalCountersMessage implements ICcAddressedMessage {
         ReportLocalCountersMessage countersMessage =
                 new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId);
         try {
-            ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage);
+            ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to report local counters", e);
             throw HyracksDataException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
index 785ad2f..51f53e7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
@@ -19,16 +19,18 @@
 package org.apache.asterix.runtime.message;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportLocalCountersRequestMessage implements INcAddressedMessage {
+public class ReportLocalCountersRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage {
     private static final long serialVersionUID = 1L;
 
     @Override
     public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
+        ReportLocalCountersMessage.send(getCcId(),
+                (NodeControllerService) appCtx.getServiceContext().getControllerService());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index b7a4c14..78b1f17 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -62,7 +62,7 @@ public class GlobalResourceIdFactory implements IResourceIdFactory {
             //if no response available or it has an exception, request a new one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
-                ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToCC(msg);
+                ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
                 reponse = resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw HyracksDataException.create(reponse.getException());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-transactions/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml
index 22ef244..80fbd6c 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -155,5 +155,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 00c6c13..840a19b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -54,8 +54,8 @@ import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.MutableLong;
 import org.apache.asterix.common.transactions.TxnLogFile;
-import org.apache.asterix.common.utils.InvokeUtil;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.util.InvokeUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 53cd038..0bb8293 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -28,7 +28,7 @@ import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.InvokeUtil;
+import org.apache.hyracks.util.InvokeUtil;
 
 public class LogManagerWithReplication extends LogManager {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 64f4e29..af6cb92 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.application;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 
@@ -33,5 +34,5 @@ public interface INCApplication extends IApplication {
      */
     IFileDeviceResolver getFileDeviceResolver();
 
-    void onRegisterNode() throws Exception;
+    void onRegisterNode(CcId ccId) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
index 80ff77c..d42cbb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
@@ -67,6 +67,10 @@ public interface IApplicationConfig {
         return (int)get(option);
     }
 
+    default short getShort(IOption option) {
+        return (short)get(option);
+    }
+
     default String getString(IOption option) {
         return (String)get(option);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
new file mode 100644
index 0000000..32782fd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.control;
+
+import java.io.Serializable;
+
+public class CcId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private short id;
+
+    private CcId(short id) {
+        this.id = id;
+    }
+
+    public static CcId valueOf(String ccIdString) {
+        return new CcId(Integer.decode(ccIdString).shortValue());
+    }
+
+    public static CcId valueOf(int ccId) {
+        if ((ccId & ~0xffff) != 0) {
+            throw new IllegalArgumentException("ccId cannot exceed 16-bits: " + Integer.toHexString(ccId));
+        }
+        return new CcId((short) ccId);
+    }
+
+    public short shortValue() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return id;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof CcId && id == ((CcId) obj).id;
+    }
+
+    @Override
+    public String toString() {
+        return "CC:" + Integer.toHexString(((int) id) & 0xffff);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b6d4f6b..35fdb2e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -126,7 +126,7 @@ public class ErrorCode {
     public static final int ILLEGAL_MEMORY_BUDGET = 90;
     public static final int TIMEOUT = 91;
     public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92;
-    public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93;
+    // 93
     public static final int CANNOT_READ_CLOSED_FILE = 94;
     public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95;
     public static final int ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT = 96;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index 47da24a..c83366f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -23,16 +23,22 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IWritable;
 
-public final class JobId implements IWritable, Serializable {
+public final class JobId implements IWritable, Serializable, Comparable {
 
-    public static final JobId INVALID = new JobId(-1l);
+    private static final int CC_BITS = Short.SIZE;
+    static final int ID_BITS = Long.SIZE - CC_BITS;
+    static final long MAX_ID = (1L << ID_BITS) - 1;
+
+    public static final JobId INVALID = null;
 
     private static final long serialVersionUID = 1L;
     private long id;
+    private transient CcId ccId;
 
     public static JobId create(DataInput dis) throws IOException {
         JobId jobId = new JobId();
@@ -51,6 +57,17 @@ public final class JobId implements IWritable, Serializable {
         return id;
     }
 
+    public CcId getCcId() {
+        if (ccId == null) {
+            ccId = CcId.valueOf((int) (id >>> ID_BITS));
+        }
+        return ccId;
+    }
+
+    public long getIdOnly() {
+        return id & MAX_ID;
+    }
+
     @Override
     public int hashCode() {
         return (int) id;
@@ -58,13 +75,7 @@ public final class JobId implements IWritable, Serializable {
 
     @Override
     public boolean equals(Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof JobId)) {
-            return false;
-        }
-        return ((JobId) o).id == id;
+        return o == this || o instanceof JobId && ((JobId) o).id == id;
     }
 
     @Override
@@ -89,4 +100,9 @@ public final class JobId implements IWritable, Serializable {
     public void readFields(DataInput input) throws IOException {
         id = input.readLong();
     }
+
+    @Override
+    public int compareTo(Object other) {
+        return Long.compare(id, ((JobId) other).id);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index eea6b52..1bb5749 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -18,20 +18,36 @@
  */
 package org.apache.hyracks.api.job;
 
+import static org.apache.hyracks.api.job.JobId.ID_BITS;
+import static org.apache.hyracks.api.job.JobId.MAX_ID;
+
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hyracks.api.control.CcId;
+
 public class JobIdFactory {
-    private final AtomicLong id = new AtomicLong(0);
+    private final AtomicLong id;
 
-    public JobId create() {
-        return new JobId(id.getAndIncrement());
+    public JobIdFactory(CcId ccId) {
+        id = new AtomicLong((long) ccId.shortValue() << ID_BITS);
     }
 
-    public long maxJobId() {
-        return id.get();
+    public JobId create() {
+        return new JobId(id.getAndUpdate(prev -> {
+            if ((prev & MAX_ID) == MAX_ID) {
+                return prev ^ MAX_ID;
+            } else {
+                return prev + 1;
+            }
+        }));
     }
 
-    public void ensureMinimumId(long id) {
-        this.id.updateAndGet(current -> Math.max(current, id));
+    public JobId maxJobId() {
+        long next = id.get();
+        if ((next & MAX_ID) == 0) {
+            return new JobId(next | MAX_ID);
+        } else {
+            return new JobId(next - 1);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 6254b86..465a661 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -109,7 +109,7 @@
 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
 91 = Operation timed out
 92 = Job %1$s has been cleared from job history
-93 = Job %1$s has not been created yet
+# 93
 94 = Cannot read closed file (%1$s)
 95 = Tuple of size %1$s cannot fit into an empty frame
 96 = Illegal attempt to enter empty component

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
new file mode 100644
index 0000000..d16eb15
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.api.control.CcId;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JobIdFactoryTest {
+
+    private static Field idField;
+
+    @BeforeClass
+    public static void setup() throws NoSuchFieldException {
+        idField = JobIdFactory.class.getDeclaredField("id");
+        idField.setAccessible(true);
+    }
+
+    @Test
+    public void testCcIds() {
+        JobIdFactory factory = new JobIdFactory(CcId.valueOf(0));
+        for (int i = 0; i < 1000; i++) {
+            final JobId jobId = factory.create();
+            Assert.assertEquals(0, jobId.getCcId().shortValue());
+            Assert.assertEquals(i, jobId.getIdOnly());
+        }
+    }
+
+    @Test
+    public void testNegativeCcId() {
+        JobIdFactory factory = new JobIdFactory(CcId.valueOf(0xFFFF));
+        for (int i = 0; i < 1000; i++) {
+            final JobId jobId = factory.create();
+            Assert.assertEquals((short) 0xFFFF, jobId.getCcId().shortValue());
+            Assert.assertEquals(i, jobId.getIdOnly());
+            Assert.assertTrue("JID not negative", jobId.getId() < 0);
+            Assert.assertEquals(0xFFFF000000000000L + i, jobId.getId());
+        }
+    }
+
+    @Test
+    public void testOverflow() throws IllegalAccessException {
+        testOverflow(0);
+        testOverflow(0xFFFF);
+        testOverflow(Short.MAX_VALUE);
+    }
+
+    private void testOverflow(int id) throws IllegalAccessException {
+        CcId ccId = CcId.valueOf(id);
+        long expected = (long) id << 48;
+        JobIdFactory factory = new JobIdFactory(ccId);
+        AtomicLong theId = (AtomicLong) idField.get(factory);
+        Assert.assertEquals(expected, theId.get());
+        theId.set((((long)1 << 48) - 1) | expected);
+        JobId jobId = factory.create();
+        Assert.assertEquals(ccId, jobId.getCcId());
+        Assert.assertEquals(JobId.MAX_ID, jobId.getIdOnly());
+        jobId = factory.create();
+        Assert.assertEquals(ccId, jobId.getCcId());
+        Assert.assertEquals(0, jobId.getIdOnly());
+    }
+
+    @Test
+    public void testComparability() throws IllegalAccessException {
+        JobIdFactory factory = new JobIdFactory(CcId.valueOf(0));
+        compareLoop(factory, false);
+        factory = new JobIdFactory(CcId.valueOf(0xFFFF));
+        compareLoop(factory, false);
+        AtomicLong theId = (AtomicLong) idField.get(factory);
+        theId.set(0xFFFFFFFFFFFFFFF0L);
+        compareLoop(factory, true);
+    }
+
+    private void compareLoop(JobIdFactory factory, boolean overflow) {
+        Set<Boolean> overflowed = new HashSet<>(Collections.singleton(false));
+        JobId prevMax = null;
+        for (int i = 0; i < 1000; i++) {
+            final JobId jobId = factory.create();
+            Assert.assertTrue("max == last", factory.maxJobId().compareTo(jobId) == 0);
+            if (i > 0) {
+                Assert.assertTrue("last > previous max", prevMax.compareTo(jobId) < 0 || overflowed.add(overflow));
+            }
+            prevMax = factory.maxJobId();
+        }
+    }
+
+    @Test
+    public void testTooLarge() {
+        try {
+            CcId.valueOf(0x10000);
+            Assert.assertTrue("expected exception", false);
+        } catch (IllegalArgumentException e) {
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message