asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject [2/7] asterixdb git commit: Implements concurrent query management support.
Date Wed, 25 Jan 2017 14:46:35 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index 4a8dd1f..0577002 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.cc.work;
 
 import java.io.File;
@@ -35,18 +36,21 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.utils.PidHelper;
 import org.apache.hyracks.control.common.work.IPCResponder;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.kohsuke.args4j.Option;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class GetNodeDetailsJSONWork extends SynchronizableWork {
     private static final Logger LOGGER = Logger.getLogger(GetNodeDetailsJSONWork.class.getName());
-    private final ClusterControllerService ccs;
+    private final INodeManager nodeManager;
+    private final CCConfig ccConfig;
     private final String nodeId;
     private final boolean includeStats;
     private final boolean includeConfig;
@@ -54,18 +58,19 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork {
     private ObjectNode detail;
     private ObjectMapper om = new ObjectMapper();
 
-    public GetNodeDetailsJSONWork(ClusterControllerService ccs, String nodeId, boolean includeStats,
+    public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
                                   boolean includeConfig, IPCResponder<String> callback) {
-        this.ccs = ccs;
+        this.nodeManager = nodeManager;
+        this.ccConfig = ccConfig;
         this.nodeId = nodeId;
         this.includeStats = includeStats;
         this.includeConfig = includeConfig;
         this.callback = callback;
     }
 
-    public GetNodeDetailsJSONWork(ClusterControllerService ccs, String nodeId, boolean includeStats,
+    public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
                                   boolean includeConfig) {
-        this(ccs, nodeId, includeStats, includeConfig, null);
+        this(nodeManager, ccConfig, nodeId, includeStats, includeConfig, null);
     }
 
     @Override
@@ -74,10 +79,10 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork {
             // null nodeId is a request for CC
             detail = getCCDetails();
             if (includeConfig) {
-                addIni(detail, ccs.getCCConfig());
+                addIni(detail, ccConfig);
             }
         } else {
-            NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
+            NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId);
             if (ncs != null) {
                 detail = ncs.toDetailedJSON(includeStats, includeConfig);
                 if (includeConfig) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
index ebafd7d..b78e817 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
@@ -18,26 +18,26 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
 public class GetNodeSummariesJSONWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
+    private final INodeManager nodeManager;
     private ArrayNode summaries;
 
-    public GetNodeSummariesJSONWork(ClusterControllerService ccs) {
-        this.ccs = ccs;
+    public GetNodeSummariesJSONWork(INodeManager nodeManager) {
+        this.nodeManager = nodeManager;
     }
 
     @Override
     protected void doRun() throws Exception {
         ObjectMapper om = new ObjectMapper();
         summaries = om.createArrayNode();
-        for (NodeControllerState ncs : ccs.getNodeMap().values()) {
+        for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) {
             summaries.add(ncs.toSummaryJSON());
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index 889c828..d7c7be1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.cc.work;
 
 import java.lang.management.ManagementFactory;
@@ -27,6 +28,7 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.common.work.IResultCallback;
@@ -59,7 +61,8 @@ public class GetThreadDumpWork extends AbstractWork {
                 callback.setException(e);
             }
         } else {
-            final NodeControllerState ncState = ccs.getNodeMap().get(nodeId);
+            INodeManager nodeManager = ccs.getNodeManager();
+            final NodeControllerState ncState = nodeManager.getNodeControllerState(nodeId);
             if (ncState == null) {
                 // bad node id, reply with null immediately
                 callback.setValue(null);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 2a383b6..5f29981 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -18,33 +18,27 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
 
 public class JobCleanupWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(JobCleanupWork.class.getName());
 
-    private ClusterControllerService ccs;
+    private IJobManager jobManager;
     private JobId jobId;
     private JobStatus status;
     private List<Exception> exceptions;
 
-    public JobCleanupWork(ClusterControllerService ccs, JobId jobId, JobStatus status, List<Exception> exceptions) {
-        this.ccs = ccs;
+    public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions) {
+        this.jobManager = jobManager;
         this.jobId = jobId;
         this.status = status;
         this.exceptions = exceptions;
@@ -52,83 +46,18 @@ public class JobCleanupWork extends AbstractWork {
 
     @Override
     public void run() {
-        LOGGER.info("Cleanup for JobRun with id: " + jobId);
-        final JobRun run = ccs.getActiveRunMap().get(jobId);
-        if (run == null) {
-            LOGGER.warning("Unable to find JobRun with id: " + jobId);
-            return;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Cleanup for JobRun with id: " + jobId);
         }
-        if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
-            finishJob(run);
-            return;
+        try {
+            JobRun jobRun = jobManager.get(jobId);
+            jobManager.prepareComplete(jobRun, status, exceptions);
+        } catch (HyracksException e) {
+            // Fail the job with the caught exception during final completion.
+            JobRun run = jobManager.get(jobId);
+            run.getExceptions().add(e);
+            run.setStatus(JobStatus.FAILURE, run.getExceptions());
         }
-        if (run.getPendingStatus() != null) {
-            LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + jobId);
-            return;
-        }
-        Set<String> targetNodes = run.getParticipatingNodeIds();
-        run.getCleanupPendingNodeIds().addAll(targetNodes);
-        if (run.getPendingStatus() != JobStatus.FAILURE && run.getPendingStatus() != JobStatus.TERMINATED) {
-            run.setPendingStatus(status, exceptions);
-        }
-        if (targetNodes != null && !targetNodes.isEmpty()) {
-            Set<String> toDelete = new HashSet<String>();
-            for (String n : targetNodes) {
-                NodeControllerState ncs = ccs.getNodeMap().get(n);
-                try {
-                    if (ncs == null) {
-                        toDelete.add(n);
-                    } else {
-                        ncs.getNodeController().cleanUpJoblet(jobId, status);
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            targetNodes.removeAll(toDelete);
-            run.getCleanupPendingNodeIds().removeAll(toDelete);
-            if (run.getCleanupPendingNodeIds().isEmpty()) {
-                finishJob(run);
-            }
-        } else {
-            finishJob(run);
-        }
-    }
-
-    private void finishJob(final JobRun run) {
-        CCApplicationContext appCtx = ccs.getApplicationContext();
-        if (appCtx != null) {
-            try {
-                appCtx.notifyJobFinish(jobId);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-            }
-        }
-        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-        run.setEndTime(System.currentTimeMillis());
-        ccs.getActiveRunMap().remove(jobId);
-        ccs.getRunMapArchive().put(jobId, run);
-        ccs.getRunHistory().put(jobId, run.getExceptions());
-
-        if (run.getActivityClusterGraph().isReportTaskDetails()) {
-            /**
-             * log job details when profiling is enabled
-             */
-            try {
-                ccs.getJobLogFile().log(createJobLogObject(run));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private ObjectNode createJobLogObject(final JobRun run) {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode jobLogObject = om.createObjectNode();
-        ActivityClusterGraph acg = run.getActivityClusterGraph();
-        jobLogObject.set("activity-cluster-graph", acg.toJSON());
-        jobLogObject.set("job-run", run.toJSON());
-        return jobLogObject;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index e7844e9..fefd3b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.Collections;
 import java.util.EnumSet;
 
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -26,9 +25,9 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.work.IResultCallback;
@@ -54,24 +53,14 @@ public class JobStartWork extends SynchronizableWork {
 
     @Override
     protected void doRun() throws Exception {
+        IJobManager jobManager = ccs.getJobManager();
         try {
             final CCApplicationContext appCtx = ccs.getApplicationContext();
             IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
                     .deserialize(acggfBytes, deploymentId, appCtx);
             IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
-            JobRun run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
-            run.setStatus(JobStatus.INITIALIZED, null);
-            run.setStartTime(System.currentTimeMillis());
-            ccs.getActiveRunMap().put(jobId, run);
-            appCtx.notifyJobCreation(jobId, acggf);
-            run.setStatus(JobStatus.RUNNING, null);
-            try {
-                run.getScheduler().startJob();
-            } catch (Exception e) {
-                ccs.getWorkQueue().schedule(
-                        new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, Collections.singletonList(e)));
-            }
-            callback.setValue(jobId);
+            JobRun run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback);
+            jobManager.add(run);
         } catch (Exception e) {
             callback.setException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 603b6f8..6a8e631 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -24,9 +24,11 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 
 public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
@@ -45,33 +47,29 @@ public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
 
     @Override
     public void runWork() {
-        final JobRun run = ccs.getActiveRunMap().get(jobId);
+        IJobManager jobManager = ccs.getJobManager();
+        final JobRun run = jobManager.get(jobId);
         Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
         if (!cleanupPendingNodes.remove(nodeId)) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning(nodeId + " not in pending cleanup nodes set: " + cleanupPendingNodes + " for Job: "
-                        + jobId);
+                LOGGER.warning(
+                        nodeId + " not in pending cleanup nodes set: " + cleanupPendingNodes + " for Job: " + jobId);
             }
             return;
         }
-        NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
+        INodeManager nodeManager = ccs.getNodeManager();
+        NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId);
         if (ncs != null) {
             ncs.getActiveJobIds().remove(jobId);
         }
         if (cleanupPendingNodes.isEmpty()) {
-            CCApplicationContext appCtx = ccs.getApplicationContext();
-            if (appCtx != null) {
-                try {
-                    appCtx.notifyJobFinish(jobId);
-                } catch (HyracksException e) {
-                    e.printStackTrace();
-                }
+            try {
+                jobManager.finalComplete(run);
+            } catch (HyracksException e) {
+                // Fail the job with the caught exception during final completion.
+                run.getExceptions().add(e);
+                run.setStatus(JobStatus.FAILURE, run.getExceptions());
             }
-            run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-            run.setEndTime(System.currentTimeMillis());
-            ccs.getActiveRunMap().remove(jobId);
-            ccs.getRunMapArchive().put(jobId, run);
-            ccs.getRunHistory().put(jobId, run.getExceptions());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index f4b80b1..62c19bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -48,7 +48,7 @@ public class NotifyDeployBinaryWork extends AbstractHeartbeatWork {
 
     @Override
     public void runWork() {
-        /** triggered remotely by a NC to notify that the NC is deployed */
+        // Triggered remotely by a NC to notify that the NC is deployed.
         DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
         dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java
index 1efbc6a..5119022 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java
@@ -39,7 +39,7 @@ public class NotifyShutdownWork extends SynchronizableWork {
 
     @Override
     public void doRun() {
-        /** triggered remotely by a NC to notify that the NC is shutting down */
+        // Triggered remotely by a NC to notify that the NC is shutting down.
         ShutdownRun sRun = ccs.getShutdownRun();
         LOGGER.info("Received shutdown acknowledgement from NC ID:" + nodeId);
         sRun.notifyShutdown(nodeId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dd26ea4..dc93515 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -18,16 +18,14 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.net.InetAddress;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -50,34 +48,15 @@ public class RegisterNodeWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
-
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
-        CCNCFunctions.NodeRegistrationResult result = null;
-        Map<String, String> ncConfiguration = null;
+        CCNCFunctions.NodeRegistrationResult result;
+        Map<String, String> ncConfiguration = new HashMap<>();
         try {
             INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
-
             NodeControllerState state = new NodeControllerState(nodeController, reg);
-            Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-            if (nodeMap.containsKey(id)) {
-                throw new Exception("Node with this name already registered.");
-            }
-            nodeMap.put(id, state);
-            Map<InetAddress, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
-            // QQQ Breach of encapsulation here - way too much duplicated data
-            // in NodeRegistration
-            String ipAddress = state.getNCConfig().dataIPAddress;
-            if (state.getNCConfig().dataPublicIPAddress != null) {
-                ipAddress = state.getNCConfig().dataPublicIPAddress;
-            }
-            ncConfiguration = new HashMap<String, String>();
+            INodeManager nodeManager = ccs.getNodeManager();
+            nodeManager.addNode(id, state);
             state.getNCConfig().toMap(ncConfiguration);
-            Set<String> nodes = ipAddressNodeNameMap.get(InetAddress.getByName(ipAddress));
-            if (nodes == null) {
-                nodes = new HashSet<String>();
-                ipAddressNodeNameMap.put(InetAddress.getByName(ipAddress), nodes);
-            }
-            nodes.add(id);
             LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
             NodeParameters params = new NodeParameters();
             params.setClusterControllerInfo(ccs.getClusterControllerInfo());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2c5c965..edc57fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -21,9 +21,9 @@ package org.apache.hyracks.control.cc.work;
 import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
 import org.apache.hyracks.control.cc.partitions.PartitionUtils;
@@ -43,7 +43,8 @@ public class RegisterPartitionAvailibilityWork extends AbstractWork {
     @Override
     public void run() {
         final PartitionId pid = partitionDescriptor.getPartitionId();
-        JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
+        IJobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.get(pid.getJobId());
         if (run == null) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
index 44fc40d..100cae2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
@@ -19,9 +19,9 @@
 package org.apache.hyracks.control.cc.work;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
 import org.apache.hyracks.control.cc.partitions.PartitionUtils;
@@ -41,7 +41,8 @@ public class RegisterPartitionRequestWork extends AbstractWork {
     @Override
     public void run() {
         PartitionId pid = partitionRequest.getPartitionId();
-        JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
+        IJobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.get(pid.getJobId());
         if (run == null) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 510c729..410b75f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -18,16 +18,16 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
 
@@ -42,40 +42,29 @@ public class RemoveDeadNodesWork extends AbstractWork {
 
     @Override
     public void run() {
-        final Set<String> deadNodes = new HashSet<String>();
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            NodeControllerState state = e.getValue();
-            if (state.incrementLastHeartbeatDuration() >= ccs.getCCConfig().maxHeartbeatLapsePeriods) {
-                deadNodes.add(e.getKey());
-                LOGGER.info(e.getKey() + " considered dead");
-            }
-        }
-        Set<JobId> affectedJobIds = new HashSet<JobId>();
-        for (String deadNode : deadNodes) {
-            NodeControllerState state = nodeMap.remove(deadNode);
-
-            // Deal with dead tasks.
-            affectedJobIds.addAll(state.getActiveJobIds());
-        }
-        int size = affectedJobIds.size();
-        if (size > 0) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Number of affected jobs: " + size);
-            }
-            for (JobId jobId : affectedJobIds) {
-                JobRun run = ccs.getActiveRunMap().get(jobId);
-                if (run != null) {
-                    run.getScheduler().notifyNodeFailures(deadNodes);
+        try {
+            INodeManager nodeManager = ccs.getNodeManager();
+            Pair<Collection<String>, Collection<JobId>> result = nodeManager.removeDeadNodes();
+            Collection<String> deadNodes = result.getLeft();
+            Collection<JobId> affectedJobIds = result.getRight();
+            int size = affectedJobIds.size();
+            if (size > 0) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Number of affected jobs: " + size);
+                }
+                IJobManager jobManager = ccs.getJobManager();
+                for (JobId jobId : affectedJobIds) {
+                    JobRun run = jobManager.get(jobId);
+                    if (run != null) {
+                        run.getExecutor().notifyNodeFailures(deadNodes);
+                    }
                 }
             }
-        }
-        if (!deadNodes.isEmpty()) {
-            try {
+            if (!deadNodes.isEmpty()) {
                 ccs.getApplicationContext().notifyNodeFailure(deadNodes);
-            } catch (HyracksException e) {
-                LOGGER.log(Level.WARNING, "Uncaught exception on notifyNodeFailure", e);
             }
+        } catch (HyracksException e) {
+            LOGGER.log(Level.WARNING, "Uncaught exception on notifyNodeFailure", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
index 2278389..02806a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
@@ -16,32 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.cc.work;
 
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.work.AbstractWork;
 
 public class ReportProfilesWork extends AbstractWork {
-    private final ClusterControllerService ccs;
+    private final IJobManager jobManager;
     private final List<JobProfile> profiles;
 
-    public ReportProfilesWork(ClusterControllerService ccs, List<JobProfile> profiles) {
-        this.ccs = ccs;
+    public ReportProfilesWork(IJobManager jobManager, List<JobProfile> profiles) {
+        this.jobManager = jobManager;
         this.profiles = profiles;
     }
 
     @Override
     public void run() {
-        Map<JobId, JobRun> runMap = ccs.getActiveRunMap();
         for (JobProfile profile : profiles) {
-            JobRun run = runMap.get(profile.getJobId());
+            JobRun run = jobManager.get(profile.getJobId());
             if (run != null) {
                 JobProfile jp = run.getJobProfile();
                 jp.merge(profile);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index 2379871..f4f2f52 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
@@ -42,7 +43,8 @@ public class TaskCompleteWork extends AbstractTaskLifecycleWork {
     @Override
     protected void performEvent(TaskAttempt ta) {
         try {
-            JobRun run = ccs.getActiveRunMap().get(jobId);
+            IJobManager jobManager = ccs.getJobManager();
+            JobRun run = jobManager.get(jobId);
             if (statistics != null) {
                 JobProfile jobProfile = run.getJobProfile();
                 Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
@@ -53,7 +55,7 @@ public class TaskCompleteWork extends AbstractTaskLifecycleWork {
                 }
                 jobletProfile.getTaskProfiles().put(taId, statistics);
             }
-            run.getScheduler().notifyTaskComplete(ta);
+            run.getExecutor().notifyTaskComplete(ta);
         } catch (HyracksException e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 8bca4e7..486e9c6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
 
@@ -37,9 +38,10 @@ public class TaskFailureWork extends AbstractTaskLifecycleWork {
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        IJobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.get(jobId);
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
-        run.getScheduler().notifyTaskFailure(ta, exceptions);
+        run.getExecutor().notifyTaskFailure(ta, exceptions);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UnregisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UnregisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UnregisterNodeWork.java
index 839dc45..7eb5345 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UnregisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UnregisterNodeWork.java
@@ -18,24 +18,20 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.Map;
-
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class UnregisterNodeWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
+    private final INodeManager nodeManager;
     private final String nodeId;
 
-    public UnregisterNodeWork(ClusterControllerService ccs, String nodeId) {
-        this.ccs = ccs;
+    public UnregisterNodeWork(INodeManager nodeManager, String nodeId) {
+        this.nodeManager = nodeManager;
         this.nodeId = nodeId;
     }
 
     @Override
     protected void doRun() throws Exception {
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        nodeMap.remove(nodeId);
+        nodeManager.removeNode(nodeId);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index c1fa945..f7ef175 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.IJobStatusConditionVariable;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,7 +40,8 @@ public class WaitForJobCompletionWork extends SynchronizableWork {
 
     @Override
     protected void doRun() throws Exception {
-        final IJobStatusConditionVariable cRunningVar = ccs.getActiveRunMap().get(jobId);
+        IJobManager jobManager = ccs.getJobManager();
+        final IJobStatusConditionVariable cRunningVar = jobManager.get(jobId);
         if (cRunningVar != null) {
             ccs.getExecutor().execute(new Runnable() {
                 @Override
@@ -53,32 +55,20 @@ public class WaitForJobCompletionWork extends SynchronizableWork {
                 }
             });
         } else {
-            final IJobStatusConditionVariable cArchivedVar = ccs.getRunMapArchive().get(jobId);
-            if (cArchivedVar != null) {
-                ccs.getExecutor().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            cArchivedVar.waitForCompletion();
-                            callback.setValue(null);
-                        } catch (Exception e) {
-                            callback.setException(e);
-                        }
-                    }
-                });
-            } else {
-                final List<Exception> exceptions = ccs.getRunHistory().get(jobId);
-                ccs.getExecutor().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        callback.setValue(null);
-                        if (exceptions != null && exceptions.size() > 0) {
-                            /** only report the first exception because IResultCallback will only throw one exception anyway */
-                            callback.setException(exceptions.get(0));
-                        }
+            final List<Exception> exceptions = jobManager.getRunHistory(jobId);
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    callback.setValue(null);
+                    if (exceptions != null && !exceptions.isEmpty()) {
+                        /**
+                         * only report the first exception because IResultCallback will only throw one exception
+                         * anyway
+                         */
+                        callback.setException(exceptions.get(0));
                     }
-                });
-            }
+                }
+            });
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
new file mode 100644
index 0000000..c742a4a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.cluster;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.hyracks.control.cc.scheduler.ResourceManager;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NodeManagerTest {
+
+    private static final long NODE_MEMORY_SIZE = 1024L;
+    private static final int NODE_CORES = 8;
+    private static final String NODE1 = "node1";
+    private static final String NODE2 = "node2";
+
+    @Test
+    public void testNormal() throws HyracksException {
+        IResourceManager resourceManager = new ResourceManager();
+        INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
+        NodeControllerState ncState1 = mockNodeControllerState(false);
+        NodeControllerState ncState2 = mockNodeControllerState(false);
+
+        // Verifies states after adding nodes.
+        nodeManager.addNode(NODE1, ncState1);
+        nodeManager.addNode(NODE2, ncState2);
+        Assert.assertTrue(nodeManager.getIpAddressNodeNameMap().size() == 1);
+        Assert.assertTrue(nodeManager.getAllNodeIds().size() == 2);
+        Assert.assertTrue(nodeManager.getAllNodeControllerStates().size() == 2);
+        Assert.assertTrue(nodeManager.getNodeControllerState(NODE1) == ncState1);
+        Assert.assertTrue(nodeManager.getNodeControllerState(NODE2) == ncState2);
+        Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize() == NODE_MEMORY_SIZE * 2);
+        Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedCores() == NODE_CORES * 2);
+        Assert.assertTrue(resourceManager.getMaximumCapacity().getAggregatedMemoryByteSize() == NODE_MEMORY_SIZE * 2);
+        Assert.assertTrue(resourceManager.getMaximumCapacity().getAggregatedCores() == NODE_CORES * 2);
+
+        // Verifies states after removing dead nodes.
+        nodeManager.removeDeadNodes();
+        verifyEmptyCluster(resourceManager, nodeManager);
+    }
+
+    @Test
+    public void testException() throws HyracksException {
+        IResourceManager resourceManager = new ResourceManager();
+        INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
+        NodeControllerState ncState1 = mockNodeControllerState(true);
+
+        boolean invalidNetworkAddress = false;
+        // Verifies states after a failure during adding nodes.
+        try {
+            nodeManager.addNode(NODE1, ncState1);
+        } catch (HyracksException e) {
+            invalidNetworkAddress = e.getErrorCode() == ErrorCode.INVALID_NETWORK_ADDRESS;
+        }
+        Assert.assertTrue(invalidNetworkAddress);
+
+        // Verifies that the cluster is empty.
+        verifyEmptyCluster(resourceManager, nodeManager);
+    }
+
+    @Test
+    public void testNullNode() throws HyracksException {
+        IResourceManager resourceManager = new ResourceManager();
+        INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
+
+        boolean invalidParameter = false;
+        // Verifies states after a failure during adding nodes.
+        try {
+            nodeManager.addNode(null, null);
+        } catch (HyracksException e) {
+            invalidParameter = e.getErrorCode() == ErrorCode.INVALID_INPUT_PARAMETER;
+        }
+        Assert.assertTrue(invalidParameter);
+
+        // Verifies that the cluster is empty.
+        verifyEmptyCluster(resourceManager, nodeManager);
+    }
+
+    private CCConfig makeCCConfig() {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.maxHeartbeatLapsePeriods = 0;
+        return ccConfig;
+    }
+
+    private NodeControllerState mockNodeControllerState(boolean invalidIpAddr) {
+        NodeControllerState ncState = mock(NodeControllerState.class);
+        String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
+        NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
+        NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002);
+        NetworkAddress msgAddr = new NetworkAddress(ipAddr, 1003);
+        when(ncState.getCapacity()).thenReturn(new NodeCapacity(NODE_MEMORY_SIZE, NODE_CORES));
+        when(ncState.getDataPort()).thenReturn(dataAddr);
+        when(ncState.getDatasetPort()).thenReturn(resultAddr);
+        when(ncState.getMessagingPort()).thenReturn(msgAddr);
+        NCConfig ncConfig = new NCConfig();
+        ncConfig.dataIPAddress = ipAddr;
+        when(ncState.getNCConfig()).thenReturn(ncConfig);
+        return ncState;
+    }
+
+    private void verifyEmptyCluster(IResourceManager resourceManager, INodeManager nodeManager) {
+        Assert.assertTrue(nodeManager.getIpAddressNodeNameMap().isEmpty());
+        Assert.assertTrue(nodeManager.getAllNodeIds().isEmpty());
+        Assert.assertTrue(nodeManager.getAllNodeControllerStates().isEmpty());
+        Assert.assertTrue(nodeManager.getNodeControllerState(NODE1) == null);
+        Assert.assertTrue(nodeManager.getNodeControllerState(NODE2) == null);
+
+        IReadOnlyClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+        IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
+        Assert.assertTrue(currentCapacity.getAggregatedMemoryByteSize() == 0L);
+        Assert.assertTrue(currentCapacity.getAggregatedCores() == 0);
+        Assert.assertTrue(maximumCapacity.getAggregatedMemoryByteSize() == 0L);
+        Assert.assertTrue(maximumCapacity.getAggregatedCores() == 0);
+        verifyNodeNotExistInCapacity(currentCapacity, NODE1);
+        verifyNodeNotExistInCapacity(currentCapacity, NODE2);
+        verifyNodeNotExistInCapacity(maximumCapacity, NODE1);
+        verifyNodeNotExistInCapacity(maximumCapacity, NODE1);
+    }
+
+    private void verifyNodeNotExistInCapacity(IReadOnlyClusterCapacity capacity, String nodeId) {
+        boolean nodeNotExist = false;
+        try {
+            capacity.getMemoryByteSize(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+        nodeNotExist = false;
+        try {
+            capacity.getCores(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
new file mode 100644
index 0000000..5e1b856
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.job;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.cluster.NodeManager;
+import org.apache.hyracks.control.common.base.INodeController;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.logs.LogFile;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class JobManagerTest {
+
+    @Test
+    public void test() throws HyracksException {
+        CCConfig ccConfig = new CCConfig();
+        IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
+        IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
+
+        // Submits runnable jobs.
+        List<JobRun> acceptedRuns = new ArrayList<>();
+        for (int id = 0; id < 4096; ++id) {
+            // Mocks an immediately executable job.
+            JobRun run = mockJobRun(id);
+            JobSpecification job = mock(JobSpecification.class);
+            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+
+            // Submits the job.
+            acceptedRuns.add(run);
+            jobManager.add(run);
+            Assert.assertTrue(jobManager.getRunningJobs().size() == id + 1);
+            Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+        }
+
+        // Submits jobs that will be deferred due to the capacity limitation.
+        List<JobRun> deferredRuns = new ArrayList<>();
+        for (int id = 4096; id < 8192; ++id) {
+            // Mocks a deferred job.
+            JobRun run = mockJobRun(id);
+            JobSpecification job = mock(JobSpecification.class);
+            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+
+            // Submits the job.
+            deferredRuns.add(run);
+            jobManager.add(run);
+            Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
+            Assert.assertTrue(jobManager.getPendingJobs().size() == id + 1 - 4096);
+        }
+
+        // Further jobs will be denied because the job queue is full.
+        boolean jobQueueFull = false;
+        try {
+            JobRun run = mockJobRun(8193);
+            JobSpecification job = mock(JobSpecification.class);
+            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+            jobManager.add(run);
+        } catch (HyracksException e) {
+            // Verifies the error code.
+            jobQueueFull = e.getErrorCode() == ErrorCode.JOB_QUEUE_FULL;
+        }
+        Assert.assertTrue(jobQueueFull);
+
+        // Completes runnable jobs.
+        for (JobRun run : acceptedRuns) {
+            jobManager.prepareComplete(run, JobStatus.TERMINATED, Collections.emptyList());
+            jobManager.finalComplete(run);
+        }
+        Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
+        Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+
+        // Completes deferred jobs.
+        for (JobRun run : deferredRuns) {
+            jobManager.prepareComplete(run, JobStatus.TERMINATED, Collections.emptyList());
+            jobManager.finalComplete(run);
+        }
+        Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
+        Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        verify(jobManager, times(8192)).prepareComplete(any(), any(), any());
+        verify(jobManager, times(8192)).finalComplete(any());
+    }
+
+    @Test
+    public void testExceedMax() throws HyracksException {
+        CCConfig ccConfig = new CCConfig();
+        IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
+        IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
+        boolean rejected = false;
+        // A job should be rejected immediately if its requirement exceeds the maximum capacity of the cluster.
+        try {
+            JobRun run = mockJobRun(1);
+            JobSpecification job = mock(JobSpecification.class);
+            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(jobCapacityController.allocate(job))
+                    .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
+            jobManager.add(run);
+        } catch (HyracksException e) {
+            // Verifies the error code.
+            rejected = e.getErrorCode() == ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY;
+        }
+        Assert.assertTrue(rejected);
+        Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
+        Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == 0);
+    }
+
+    @Test
+    public void testAdmitThenReject() throws HyracksException {
+        CCConfig ccConfig = new CCConfig();
+        IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
+        IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
+
+        // A pending job should also be rejected if its requirement exceeds the updated maximum capacity of the cluster.
+        // A normal run.
+        JobRun run1 = mockJobRun(1);
+        JobSpecification job1 = mock(JobSpecification.class);
+        when(run1.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job1);
+        when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        jobManager.add(run1);
+
+        // A failure run.
+        JobRun run2 = mockJobRun(2);
+        JobSpecification job2 = mock(JobSpecification.class);
+        when(run2.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job2);
+        when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+                .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
+        jobManager.add(run2);
+
+        // Completes the first run.
+        jobManager.prepareComplete(run1, JobStatus.TERMINATED, Collections.emptyList());
+        jobManager.finalComplete(run1);
+
+        // Verifies job status of the failed job.
+        verify(run2, times(1)).setStatus(eq(JobStatus.PENDING), any());
+        verify(run2, times(1)).setPendingStatus(eq(JobStatus.FAILURE), any());
+    }
+
+    @Test
+    public void testNullJob() throws HyracksException {
+        CCConfig ccConfig = new CCConfig();
+        IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
+        IJobManager jobManager = new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController);
+        boolean invalidParameter = false;
+        try {
+            jobManager.add(null);
+        } catch (HyracksException e) {
+            invalidParameter = e.getErrorCode() == ErrorCode.INVALID_INPUT_PARAMETER;
+        }
+        Assert.assertTrue(invalidParameter);
+        Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
+        Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+    }
+
+    private JobRun mockJobRun(long id) {
+        JobRun run = mock(JobRun.class, Mockito.RETURNS_DEEP_STUBS);
+        when(run.getExceptions()).thenReturn(Collections.emptyList());
+        when(run.getActivityClusterGraph().isReportTaskDetails()).thenReturn(true);
+        when(run.getPendingExceptions()).thenReturn(Collections.emptyList());
+        JobId jobId = new JobId(id);
+        when(run.getJobId()).thenReturn(jobId);
+
+        Set<String> nodes = new HashSet<>();
+        nodes.add("node1");
+        nodes.add("node2");
+        when(run.getParticipatingNodeIds()).thenReturn(nodes);
+        when(run.getCleanupPendingNodeIds()).thenReturn(nodes);
+        return run;
+    }
+
+    private ClusterControllerService mockClusterControllerService() {
+        ClusterControllerService ccs = mock(ClusterControllerService.class);
+        CCApplicationContext appCtx = mock(CCApplicationContext.class);
+        LogFile logFile = mock(LogFile.class);
+        INodeManager nodeManager = mockNodeManager();
+        when(ccs.getApplicationContext()).thenReturn(appCtx);
+        when(ccs.getJobLogFile()).thenReturn(logFile);
+        when(ccs.getNodeManager()).thenReturn(nodeManager);
+        return ccs;
+    }
+
+    private INodeManager mockNodeManager() {
+        INodeManager nodeManager = mock(NodeManager.class);
+        NodeControllerState ncState = mock(NodeControllerState.class);
+        INodeController nodeController = mock(INodeController.class);
+        when(nodeManager.getNodeControllerState(any())).thenReturn(ncState);
+        when(ncState.getNodeController()).thenReturn(nodeController);
+        return nodeManager;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index dff5827..5c27a6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -22,6 +22,7 @@ import java.net.URL;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -37,7 +38,7 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 public interface INodeController {
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            EnumSet<JobFlag> flags) throws Exception;
+            Set<JobFlag> flags) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index d7c3d36..b636a09 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -102,6 +102,15 @@ public class CCConfig {
             usage = "Specify path to master configuration file (default: none)", required = false)
     public String configFile = null;
 
+    @Option(name = "-job-queue-class-name", usage = "Specify the implementation class name for the job queue. (default:"
+            + " org.apache.hyracks.control.cc.scheduler.FIFOJobQueue)",
+            required = false)
+    public String jobQueueClassName = "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue";
+
+    @Option(name = "-job-manager-class-name", usage = "Specify the implementation class name for the job manager. "
+            + "(default: org.apache.hyracks.control.cc.job.JobManager)", required = false)
+    public String jobManagerClassName = "org.apache.hyracks.control.cc.job.JobManager";
+
     @Argument
     @Option(name = "--", handler = StopOptionHandler.class)
     public List<String> appArgs;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index e95a004..490b6ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 
 public final class NodeRegistration implements Serializable {
@@ -69,11 +70,13 @@ public final class NodeRegistration implements Serializable {
 
     private final int pid;
 
+    private final NodeCapacity capacity;
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
                             NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors,
                             String vmName, String vmVersion, String vmVendor, String classpath, String libraryPath,
                             String bootClasspath, List<String> inputArguments, Map<String, String> systemProperties,
-                            HeartbeatSchema hbSchema, NetworkAddress messagingPort, int pid) {
+            HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity, int pid) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
@@ -93,6 +96,7 @@ public final class NodeRegistration implements Serializable {
         this.systemProperties = systemProperties;
         this.hbSchema = hbSchema;
         this.messagingPort = messagingPort;
+        this.capacity = capacity;
         this.pid = pid;
     }
 
@@ -104,6 +108,10 @@ public final class NodeRegistration implements Serializable {
         return nodeId;
     }
 
+    public NodeCapacity getCapacity() {
+        return capacity;
+    }
+
     public NCConfig getNCConfig() {
         return ncConfig;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentRun.java
index 416b0e6..3c5c97a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentRun.java
@@ -19,6 +19,7 @@
 
 package org.apache.hyracks.control.common.deployment;
 
+import java.util.Collection;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -33,7 +34,7 @@ public class DeploymentRun implements IDeploymentStatusConditionVariable {
     private DeploymentStatus deploymentStatus = DeploymentStatus.FAIL;
     private final Set<String> deploymentNodeIds = new TreeSet<String>();
 
-    public DeploymentRun(Set<String> nodeIds) {
+    public DeploymentRun(Collection<String> nodeIds) {
         deploymentNodeIds.addAll(nodeIds);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index aa9a4fe..4ee34ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.common.ipc;
 
 import java.io.ByteArrayInputStream;
@@ -33,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -676,11 +678,11 @@ public class CCNCFunctions {
         private final byte[] planBytes;
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
-        private final EnumSet<JobFlag> flags;
+        private final Set<JobFlag> flags;
 
         public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
                 List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) {
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags) {
             this.deploymentId = deploymentId;
             this.jobId = jobId;
             this.planBytes = planBytes;
@@ -714,7 +716,7 @@ public class CCNCFunctions {
             return connectorPolicies;
         }
 
-        public EnumSet<JobFlag> getFlags() {
+        public Set<JobFlag> getFlags() {
             return flags;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index c3376e6..0d59b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -19,9 +19,9 @@
 package org.apache.hyracks.control.common.ipc;
 
 import java.net.URL;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -46,7 +46,7 @@ public class NodeControllerRemoteProxy implements INodeController {
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            EnumSet<JobFlag> flags) throws Exception {
+            Set<JobFlag> flags) throws Exception {
         CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
                 taskDescriptors, connectorPolicies, flags);
         ipcHandle.send(-1, stf, null);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
index 0a50f6f..eae2eb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
@@ -19,6 +19,7 @@
 
 package org.apache.hyracks.control.common.shutdown;
 
+import java.util.Collection;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
@@ -29,7 +30,7 @@ public class ShutdownRun implements IShutdownStatusConditionVariable{
     private boolean shutdownSuccess = false;
     private static final long SHUTDOWN_TIMER_MS = TimeUnit.SECONDS.toMillis(30);
 
-    public ShutdownRun(Set<String> nodeIds) {
+    public ShutdownRun(Collection<String> nodeIds) {
         shutdownNodeIds.addAll(nodeIds);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d44a4e5..19f01c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -49,6 +49,7 @@ import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
@@ -266,11 +267,15 @@ public class NodeControllerService implements IControllerService {
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
         NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
                 : null;
+        int allCores = osMXBean.getAvailableProcessors();
         ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
-                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
                 runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
                 runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
                 runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
+                ncAppEntryPoint == null
+                        ? new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 ? allCores - 1 : allCores)
+                        : ncAppEntryPoint.getCapacity(),
                 PidHelper.getPid()));
 
         synchronized (this) {
@@ -455,7 +460,7 @@ public class NodeControllerService implements IControllerService {
 
             hbData.diskReads = ioCounter.getReads();
             hbData.diskWrites = ioCounter.getWrites();
-            hbData.numCores = Runtime.getRuntime().availableProcessors();
+            hbData.numCores = Runtime.getRuntime().availableProcessors() - 1; // Reserves one core for heartbeats.
 
             try {
                 cc.nodeHeartbeat(id, hbData);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index cb38076..05c9f07 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -343,7 +343,7 @@ public class IOManager implements IIOManager {
     public FileReference resolveAbsolutePath(String path) throws HyracksDataException {
         IODeviceHandle devHandle = getDevice(path);
         if (devHandle == null) {
-            throw HyracksDataException.create(ErrorCode.RUNTIME_FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE, path);
+            throw HyracksDataException.create(ErrorCode.FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE, path);
         }
         String relativePath = devHandle.getRelativePath(path);
         return new FileReference(devHandle, relativePath);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index d27caf2..803f15a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -22,9 +22,9 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -83,11 +83,11 @@ public class StartTasksWork extends AbstractWork {
 
     private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
 
-    private final EnumSet<JobFlag> flags;
+    private final Set<JobFlag> flags;
 
     public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, EnumSet<JobFlag> flags) {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags) {
         this.ncs = ncs;
         this.deploymentId = deploymentId;
         this.jobId = jobId;
@@ -219,7 +219,7 @@ public class StartTasksWork extends AbstractWork {
 
     private IPartitionWriterFactory createPartitionWriterFactory(final IHyracksTaskContext ctx,
             IConnectorPolicy cPolicy, final JobId jobId, final IConnectorDescriptor conn, final int senderIndex,
-            final TaskAttemptId taId, EnumSet<JobFlag> flags) {
+            final TaskAttemptId taId, Set<JobFlag> flags) {
         IPartitionWriterFactory factory;
         if (cPolicy.materializeOnSendSide()) {
             if (cPolicy.consumerWaitsForProducerToFinish()) {


Mime
View raw message