asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject [3/7] asterixdb git commit: Implements concurrent query management support.
Date Wed, 25 Jan 2017 14:46:36 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
deleted file mode 100644
index b577ff7..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++ /dev/null
@@ -1,745 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import org.apache.hyracks.api.deployment.DeploymentId;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityCluster;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.api.util.JavaSerializationUtils;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
-import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
-import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.cc.job.Task;
-import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.hyracks.control.cc.job.TaskCluster;
-import org.apache.hyracks.control.cc.job.TaskClusterAttempt;
-import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
-import org.apache.hyracks.control.cc.work.JobCleanupWork;
-import org.apache.hyracks.control.common.job.PartitionState;
-import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-
-public class JobScheduler {
-    private static final Logger LOGGER = Logger.getLogger(JobScheduler.class.getName());
-
-    private final ClusterControllerService ccs;
-
-    private final JobRun jobRun;
-
-    private final PartitionConstraintSolver solver;
-
-    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
-
-    private final Set<TaskCluster> inProgressTaskClusters;
-
-
-    public JobScheduler(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
-        this.ccs = ccs;
-        this.jobRun = jobRun;
-        solver = new PartitionConstraintSolver();
-        partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
-        inProgressTaskClusters = new HashSet<TaskCluster>();
-        solver.addConstraints(constraints);
-    }
-
-    public JobRun getJobRun() {
-        return jobRun;
-    }
-
-    public PartitionConstraintSolver getSolver() {
-        return solver;
-    }
-
-    public void startJob() throws HyracksException {
-        startRunnableActivityClusters();
-        ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
-    }
-
-    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
-            throws HyracksException {
-        for (ActivityCluster root : roots) {
-            findRunnableTaskClusterRoots(frontier, root);
-        }
-    }
-
-    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, ActivityCluster candidate)
-            throws HyracksException {
-        boolean depsComplete = true;
-        for (ActivityCluster depAC : candidate.getDependencies()) {
-            if (!isPlanned(depAC)) {
-                depsComplete = false;
-                findRunnableTaskClusterRoots(frontier, depAC);
-            } else {
-                boolean tcRootsComplete = true;
-                for (TaskCluster tc : getActivityClusterPlan(depAC).getTaskClusters()) {
-                    if (tc.getProducedPartitions().isEmpty()) {
-                        TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
-                        if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
-                            tcRootsComplete = false;
-                            break;
-                        }
-                    }
-                }
-                if (!tcRootsComplete) {
-                    depsComplete = false;
-                    findRunnableTaskClusterRoots(frontier, depAC);
-                }
-            }
-        }
-        if (depsComplete) {
-            if (!isPlanned(candidate)) {
-                ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
-                ActivityClusterPlan acPlan = acp.planActivityCluster(candidate);
-                jobRun.getActivityClusterPlanMap().put(candidate.getId(), acPlan);
-                partitionProducingTaskClusterMap.putAll(acp.getPartitionProducingTaskClusterMap());
-            }
-            for (TaskCluster tc : getActivityClusterPlan(candidate).getTaskClusters()) {
-                if (tc.getProducedPartitions().isEmpty()) {
-                    TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
-                    if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
-                        frontier.add(tc);
-                    }
-                }
-            }
-        }
-    }
-
-    private ActivityClusterPlan getActivityClusterPlan(ActivityCluster ac) {
-        return jobRun.getActivityClusterPlanMap().get(ac.getId());
-    }
-
-    private boolean isPlanned(ActivityCluster ac) {
-        return jobRun.getActivityClusterPlanMap().get(ac.getId()) != null;
-    }
-
-    private void startRunnableActivityClusters() throws HyracksException {
-        Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
-        findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap()
-                .values());
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
-                    + inProgressTaskClusters);
-        }
-        if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
-            ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
-            return;
-        }
-        startRunnableTaskClusters(taskClusterRoots);
-    }
-
-    private void startRunnableTaskClusters(Set<TaskCluster> tcRoots) throws HyracksException {
-        Map<TaskCluster, Runnability> runnabilityMap = new HashMap<TaskCluster, Runnability>();
-        for (TaskCluster tc : tcRoots) {
-            assignRunnabilityRank(tc, runnabilityMap);
-        }
-
-        PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<RankedRunnableTaskCluster>();
-        for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
-            TaskCluster tc = e.getKey();
-            Runnability runnability = e.getValue();
-            if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
-                continue;
-            }
-            int priority = runnability.getPriority();
-            if (priority >= 0 && priority < Integer.MAX_VALUE) {
-                queue.add(new RankedRunnableTaskCluster(priority, tc));
-            }
-        }
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Ranked TCs: " + queue);
-        }
-
-        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        for (RankedRunnableTaskCluster rrtc : queue) {
-            TaskCluster tc = rrtc.getTaskCluster();
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Found runnable TC: " + tc);
-                List<TaskClusterAttempt> attempts = tc.getAttempts();
-                LOGGER.fine("Attempts so far:" + attempts.size());
-                for (TaskClusterAttempt tcAttempt : attempts) {
-                    LOGGER.fine("Status: " + tcAttempt.getStatus());
-                }
-            }
-            assignTaskLocations(tc, taskAttemptMap);
-        }
-
-        if (taskAttemptMap.isEmpty()) {
-            return;
-        }
-
-        startTasks(taskAttemptMap);
-    }
-
-    /*
-     * Runnability rank has the following semantics
-     * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0}
-     * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
-     * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _}
-     */
-    private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Computing runnability: " + goal);
-        }
-        if (runnabilityMap.containsKey(goal)) {
-            return runnabilityMap.get(goal);
-        }
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
-        if (lastAttempt != null) {
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Last Attempt Status: " + lastAttempt.getStatus());
-            }
-            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
-                Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
-                runnabilityMap.put(goal, runnability);
-                return runnability;
-            }
-            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
-                Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
-                runnabilityMap.put(goal, runnability);
-                return runnability;
-            }
-        }
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
-        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
-        for (PartitionId pid : goal.getRequiredPartitions()) {
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Inspecting required partition: " + pid);
-            }
-            Runnability runnability;
-            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
-            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
-            PartitionState maxState = pmm.getMaximumAvailableState(pid);
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Policy: " + cPolicy + " maxState: " + maxState);
-            }
-            if (PartitionState.COMMITTED.equals(maxState)) {
-                runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
-            } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
-                runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
-            } else {
-                runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
-                switch (runnability.getTag()) {
-                    case RUNNABLE:
-                        if (cPolicy.consumerWaitsForProducerToFinish()) {
-                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
-                        } else {
-                            runnability = new Runnability(Runnability.Tag.RUNNABLE, runnability.getPriority() + 1);
-                        }
-                        break;
-
-                    case NOT_RUNNABLE:
-                        break;
-
-                    case RUNNING:
-                        if (cPolicy.consumerWaitsForProducerToFinish()) {
-                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
-                        } else {
-                            runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
-                        }
-                        break;
-                }
-            }
-            aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
-            if (aggregateRunnability.getTag() == Runnability.Tag.NOT_RUNNABLE) {
-                // already not runnable -- cannot get better. bail.
-                break;
-            }
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("aggregateRunnability: " + aggregateRunnability);
-            }
-        }
-        runnabilityMap.put(goal, aggregateRunnability);
-        return aggregateRunnability;
-    }
-
-    private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
-            throws HyracksException {
-        ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
-        Task[] tasks = tc.getTasks();
-        List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
-        int attempts = tcAttempts.size();
-        TaskClusterAttempt tcAttempt = new TaskClusterAttempt(tc, attempts);
-        Map<TaskId, TaskAttempt> taskAttempts = new HashMap<TaskId, TaskAttempt>();
-        Map<TaskId, LValueConstraintExpression> locationMap = new HashMap<TaskId, LValueConstraintExpression>();
-        for (int i = 0; i < tasks.length; ++i) {
-            Task ts = tasks[i];
-            TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
-                    tid.getPartition()), attempts), ts);
-            taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
-            locationMap.put(tid,
-                    new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
-            taskAttempts.put(tid, taskAttempt);
-        }
-        tcAttempt.setTaskAttempts(taskAttempts);
-        solver.solve(locationMap.values());
-        for (int i = 0; i < tasks.length; ++i) {
-            Task ts = tasks[i];
-            TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = taskAttempts.get(tid);
-            String nodeId = assignLocation(acg, locationMap, tid, taskAttempt);
-            taskAttempt.setNodeId(nodeId);
-            taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
-            taskAttempt.setStartTime(System.currentTimeMillis());
-            List<TaskAttemptDescriptor> tads = taskAttemptMap.get(nodeId);
-            if (tads == null) {
-                tads = new ArrayList<TaskAttemptDescriptor>();
-                taskAttemptMap.put(nodeId, tads);
-            }
-            OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
-            jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId);
-            ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
-            TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
-                    apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());
-            tads.add(tad);
-        }
-        tcAttempt.initializePendingTaskCounter();
-        tcAttempts.add(tcAttempt);
-
-        /**
-         * Improvement for reducing master/slave message communications, for each TaskAttemptDescriptor,
-         * we set the NetworkAddress[][] partitionLocations, in which each row is for an incoming connector descriptor
-         * and each column is for an input channel of the connector.
-         */
-        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
-            List<TaskAttemptDescriptor> tads = e.getValue();
-            for (TaskAttemptDescriptor tad : tads) {
-                TaskAttemptId taid = tad.getTaskAttemptId();
-                int attempt = taid.getAttempt();
-                TaskId tid = taid.getTaskId();
-                ActivityId aid = tid.getActivityId();
-                List<IConnectorDescriptor> inConnectors = acg.getActivityInputs(aid);
-                int[] inPartitionCounts = tad.getInputPartitionCounts();
-                if (inPartitionCounts != null) {
-                    NetworkAddress[][] partitionLocations = new NetworkAddress[inPartitionCounts.length][];
-                    for (int i = 0; i < inPartitionCounts.length; ++i) {
-                        ConnectorDescriptorId cdId = inConnectors.get(i).getConnectorId();
-                        IConnectorPolicy policy = jobRun.getConnectorPolicyMap().get(cdId);
-                        /**
-                         * carry sender location information into a task
-                         * when it is not the case that it is an re-attempt and the send-side
-                         * is materialized blocking.
-                         */
-                        if (!(attempt > 0 && policy.materializeOnSendSide() && policy
-                                .consumerWaitsForProducerToFinish())) {
-                            ActivityId producerAid = acg.getProducerActivity(cdId);
-                            partitionLocations[i] = new NetworkAddress[inPartitionCounts[i]];
-                            for (int j = 0; j < inPartitionCounts[i]; ++j) {
-                                TaskId producerTaskId = new TaskId(producerAid, j);
-                                String nodeId = findTaskLocation(producerTaskId);
-                                partitionLocations[i][j] = ccs.getNodeMap().get(nodeId).getDataPort();
-                            }
-                        }
-                    }
-                    tad.setInputPartitionLocations(partitionLocations);
-                }
-            }
-        }
-
-        tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
-        tcAttempt.setStartTime(System.currentTimeMillis());
-        inProgressTaskClusters.add(tc);
-    }
-
-    private String assignLocation(ActivityClusterGraph acg, Map<TaskId, LValueConstraintExpression> locationMap,
-            TaskId tid, TaskAttempt taskAttempt) throws HyracksException {
-        ActivityId aid = tid.getActivityId();
-        ActivityCluster ac = acg.getActivityMap().get(aid);
-        Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(aid);
-        String nodeId = null;
-        if (blockers != null) {
-            for (ActivityId blocker : blockers) {
-                nodeId = findTaskLocation(new TaskId(blocker, tid.getPartition()));
-                if (nodeId != null) {
-                    break;
-                }
-            }
-        }
-        Set<String> liveNodes = ccs.getNodeMap().keySet();
-        if (nodeId == null) {
-            LValueConstraintExpression pLocationExpr = locationMap.get(tid);
-            Object location = solver.getValue(pLocationExpr);
-            if (location == null) {
-                // pick any
-                nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
-                        % liveNodes.size()];
-            } else if (location instanceof String) {
-                nodeId = (String) location;
-            } else if (location instanceof String[]) {
-                for (String choice : (String[]) location) {
-                    if (liveNodes.contains(choice)) {
-                        nodeId = choice;
-                        break;
-                    }
-                }
-                if (nodeId == null) {
-                    throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
-                }
-            } else {
-                throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
-                        + location.getClass() + ")");
-            }
-        }
-        if (nodeId == null) {
-            throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
-        }
-        if (!liveNodes.contains(nodeId)) {
-            throw new HyracksException("Node " + nodeId + " not live");
-        }
-        return nodeId;
-    }
-
-    private String findTaskLocation(TaskId tid) {
-        ActivityId aid = tid.getActivityId();
-        ActivityCluster ac = jobRun.getActivityClusterGraph().getActivityMap().get(aid);
-        Task[] tasks = getActivityClusterPlan(ac).getActivityPlanMap().get(aid).getTasks();
-        List<TaskClusterAttempt> tcAttempts = tasks[tid.getPartition()].getTaskCluster().getAttempts();
-        if (tcAttempts == null || tcAttempts.isEmpty()) {
-            return null;
-        }
-        TaskClusterAttempt lastTCA = tcAttempts.get(tcAttempts.size() - 1);
-        TaskAttempt ta = lastTCA.getTaskAttempts().get(tid);
-        return ta == null ? null : ta.getNodeId();
-    }
-
-    private static TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
-        List<TaskClusterAttempt> attempts = tc.getAttempts();
-        if (!attempts.isEmpty()) {
-            return attempts.get(attempts.size() - 1);
-        }
-        return null;
-    }
-
-    private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
-        final DeploymentId deploymentId = jobRun.getDeploymentId();
-        final JobId jobId = jobRun.getJobId();
-        final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
-        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
-                jobRun.getConnectorPolicyMap());
-        try {
-            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
-            for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
-                String nodeId = entry.getKey();
-                final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
-                final NodeControllerState node = ccs.getNodeMap().get(nodeId);
-                if (node != null) {
-                    node.getActiveJobIds().add(jobRun.getJobId());
-                    boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
-                    }
-                    byte[] jagBytes = changed ? acgBytes : null;
-                    node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
-                            connectorPolicies, jobRun.getFlags());
-                }
-            }
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    private void abortJob(List<Exception> exceptions) {
-        Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<TaskCluster>(inProgressTaskClusters);
-        for (TaskCluster tc : inProgressTaskClustersCopy) {
-            abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
-        }
-        assert inProgressTaskClusters.isEmpty();
-        ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exceptions));
-    }
-
-    private void abortTaskCluster(TaskClusterAttempt tcAttempt,
-            TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
-        LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
-        Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
-        Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
-        for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
-            TaskAttemptId taId = ta.getTaskAttemptId();
-            TaskAttempt.TaskStatus status = ta.getStatus();
-            abortTaskIds.add(taId);
-            LOGGER.fine("Checking " + taId + ": " + ta.getStatus());
-            if (status == TaskAttempt.TaskStatus.RUNNING || status == TaskAttempt.TaskStatus.COMPLETED) {
-                ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
-                ta.setEndTime(System.currentTimeMillis());
-                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
-                if (status == TaskAttempt.TaskStatus.RUNNING && abortTaskAttempts == null) {
-                    abortTaskAttempts = new ArrayList<TaskAttemptId>();
-                    abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
-                }
-                if (status == TaskAttempt.TaskStatus.RUNNING) {
-                    abortTaskAttempts.add(taId);
-                }
-            }
-        }
-        final JobId jobId = jobRun.getJobId();
-        LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
-        for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
-            final NodeControllerState node = ccs.getNodeMap().get(entry.getKey());
-            final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
-            if (node != null) {
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
-                }
-                try {
-                    node.getNodeController().abortTasks(jobId, abortTaskAttempts);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-        inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
-        TaskCluster tc = tcAttempt.getTaskCluster();
-        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
-        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
-
-        tcAttempt.setStatus(failedOrAbortedStatus);
-        tcAttempt.setEndTime(System.currentTimeMillis());
-    }
-
-    private void abortDoomedTaskClusters() throws HyracksException {
-        Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>();
-        for (TaskCluster tc : inProgressTaskClusters) {
-            // Start search at TCs that produce no outputs (sinks)
-            if (tc.getProducedPartitions().isEmpty()) {
-                findDoomedTaskClusters(tc, doomedTaskClusters);
-            }
-        }
-
-        for (TaskCluster tc : doomedTaskClusters) {
-            TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
-            if (tca != null) {
-                abortTaskCluster(tca, TaskClusterAttempt.TaskClusterStatus.ABORTED);
-            }
-        }
-    }
-
-    private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) {
-        if (doomedTaskClusters.contains(tc)) {
-            return true;
-        }
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt != null) {
-            switch (lastAttempt.getStatus()) {
-                case ABORTED:
-                case FAILED:
-                    return true;
-
-                case COMPLETED:
-                    return false;
-            }
-        }
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
-        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        boolean doomed = false;
-        for (TaskCluster depTC : tc.getDependencyTaskClusters()) {
-            if (findDoomedTaskClusters(depTC, doomedTaskClusters)) {
-                doomed = true;
-            }
-        }
-        for (PartitionId pid : tc.getRequiredPartitions()) {
-            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
-            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
-            PartitionState maxState = pmm.getMaximumAvailableState(pid);
-            if (maxState == null
-                    || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) {
-                if (findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) {
-                    doomed = true;
-                }
-            }
-        }
-        if (doomed) {
-            doomedTaskClusters.add(tc);
-        }
-        return doomed;
-    }
-
-    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
-        TaskAttemptId taId = ta.getTaskAttemptId();
-        TaskCluster tc = ta.getTask().getTaskCluster();
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-            TaskAttempt.TaskStatus taStatus = ta.getStatus();
-            if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
-                ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
-                ta.setEndTime(System.currentTimeMillis());
-                if (lastAttempt.decrementPendingTasksCounter() == 0) {
-                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
-                    lastAttempt.setEndTime(System.currentTimeMillis());
-                    inProgressTaskClusters.remove(tc);
-                    startRunnableActivityClusters();
-                }
-            } else {
-                LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
-            }
-        } else {
-            LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
-        }
-    }
-
-    /**
-     * Indicates that a single task attempt has encountered a failure.
-     * @param ta Failed Task Attempt
-     * @param exceptions exeptions thrown during the failure
-     */
-    public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
-        try {
-            LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
-            TaskAttemptId taId = ta.getTaskAttemptId();
-            TaskCluster tc = ta.getTask().getTaskCluster();
-            TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-            if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-                LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
-                ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
-                abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
-                abortDoomedTaskClusters();
-                if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
-                    abortJob(exceptions);
-                    return;
-                }
-                startRunnableActivityClusters();
-            } else {
-                LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = "
-                        + lastAttempt);
-            }
-        } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
-        }
-    }
-
-    /**
-     * Indicates that the provided set of nodes have left the cluster.
-     *
-     * @param deadNodes
-     *            - Set of failed nodes
-     */
-    public void notifyNodeFailures(Set<String> deadNodes) {
-        try {
-            jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
-            jobRun.getParticipatingNodeIds().removeAll(deadNodes);
-            jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
-            if (jobRun.getPendingStatus() != null && jobRun.getCleanupPendingNodeIds().isEmpty()) {
-                finishJob(jobRun);
-                return;
-            }
-            for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
-                if (isPlanned(ac)) {
-                    TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
-                    if (taskClusters != null) {
-                        for (TaskCluster tc : taskClusters) {
-                            TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
-                            if (lastTaskClusterAttempt != null
-                                    && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
-                                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
-                                boolean abort = false;
-                                for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
-                                    assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
-                                    if (deadNodes.contains(ta.getNodeId())) {
-                                        ta.setStatus(
-                                                TaskAttempt.TaskStatus.FAILED,
-                                                Collections.singletonList(new Exception("Node " + ta.getNodeId()
-                                                        + " failed")));
-                                        ta.setEndTime(System.currentTimeMillis());
-                                        abort = true;
-                                    }
-                                }
-                                if (abort) {
-                                    abortTaskCluster(lastTaskClusterAttempt,
-                                            TaskClusterAttempt.TaskClusterStatus.ABORTED);
-                                }
-                            }
-                        }
-                        abortDoomedTaskClusters();
-                    }
-                }
-            }
-            startRunnableActivityClusters();
-        } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
-        }
-    }
-
-    private void finishJob(final JobRun run) {
-        JobId jobId = run.getJobId();
-        CCApplicationContext appCtx = ccs.getApplicationContext();
-        if (appCtx != null) {
-            try {
-                appCtx.notifyJobFinish(jobId);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-            }
-        }
-        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-        run.setEndTime(System.currentTimeMillis());
-        ccs.getActiveRunMap().remove(jobId);
-        ccs.getRunMapArchive().put(jobId, run);
-        ccs.getRunHistory().put(jobId, run.getExceptions());
-
-        if (run.getActivityClusterGraph().isReportTaskDetails()) {
-            /**
-             * log job details when task-profiling is enabled
-             */
-            try {
-                ccs.getJobLogFile().log(createJobLogObject(run));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private ObjectNode createJobLogObject(final JobRun run) {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode jobLogObject = om.createObjectNode();
-        ActivityClusterGraph acg = run.getActivityClusterGraph();
-        jobLogObject.set("activity-cluster-graph", acg.toJSON());
-        jobLogObject.set("job-run", run.toJSON());
-        return jobLogObject;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java
deleted file mode 100644
index 6a41d01..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-
-public class PartitionConstraintSolver {
-    private final Map<LValueConstraintExpression, Set<ConstraintExpression>> constraints;
-
-    public PartitionConstraintSolver() {
-        constraints = new HashMap<LValueConstraintExpression, Set<ConstraintExpression>>();
-    }
-
-    public void addConstraints(Collection<Constraint> constraintCollection) {
-        for (Constraint c : constraintCollection) {
-            addConstraint(c);
-        }
-    }
-
-    public void addConstraint(Constraint c) {
-        Set<ConstraintExpression> rValues = constraints.get(c.getLValue());
-        if (rValues == null) {
-            rValues = new HashSet<ConstraintExpression>();
-            constraints.put(c.getLValue(), rValues);
-        }
-        rValues.add(c.getRValue());
-    }
-
-    public void solve(Collection<LValueConstraintExpression> targetSet) {
-        Set<LValueConstraintExpression> inProcess = new HashSet<LValueConstraintExpression>();
-        for (LValueConstraintExpression lv : targetSet) {
-            solveLValue(lv, inProcess);
-        }
-    }
-
-    private Solution solve(ConstraintExpression ce, Set<LValueConstraintExpression> inProcess) {
-        switch (ce.getTag()) {
-            case CONSTANT:
-                return new Solution(((ConstantExpression) ce).getValue(), Solution.Status.FOUND);
-
-            case PARTITION_COUNT:
-            case PARTITION_LOCATION:
-                return solveLValue((LValueConstraintExpression) ce, inProcess);
-        }
-        return null;
-    }
-
-    private Solution solveLValue(LValueConstraintExpression lv, Set<LValueConstraintExpression> inProcess) {
-        if (inProcess.contains(lv)) {
-            return new Solution(null, Solution.Status.CYCLE);
-        }
-        Solution result = null;
-        inProcess.add(lv);
-        Set<ConstraintExpression> rValues = constraints.get(lv);
-        if (rValues == null) {
-            return new Solution(null, Solution.Status.NOT_BOUND);
-        }
-        for (ConstraintExpression ce : rValues) {
-            Solution solution = solve(ce, inProcess);
-            if (solution != null && solution.status == Solution.Status.FOUND) {
-                result = solution;
-                break;
-            }
-        }
-        if (result != null) {
-            rValues.clear();
-            rValues.add(new ConstantExpression(result.value));
-        }
-        inProcess.remove(lv);
-        return result;
-    }
-
-    public Object getValue(LValueConstraintExpression lValue) {
-        Set<ConstraintExpression> rValues = constraints.get(lValue);
-        if (rValues == null) {
-            return null;
-        }
-        if (rValues.size() != 1) {
-            return null;
-        }
-        for (ConstraintExpression ce : rValues) {
-            if (ce.getTag() == ConstraintExpression.ExpressionTag.CONSTANT) {
-                return ((ConstantExpression) ce).getValue();
-            }
-        }
-        return null;
-    }
-
-    private static class Solution {
-        enum Status {
-            FOUND,
-            CYCLE,
-            NOT_BOUND,
-        }
-
-        final Object value;
-        final Status status;
-
-        public Solution(Object value, Status status) {
-            this.value = value;
-            this.status = status;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
deleted file mode 100644
index a79bf50..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-import org.apache.hyracks.control.cc.job.TaskCluster;
-
-public class RankedRunnableTaskCluster implements Comparable<RankedRunnableTaskCluster> {
-    private final int rank;
-
-    private final TaskCluster taskCluster;
-
-    public RankedRunnableTaskCluster(int rank, TaskCluster taskCluster) {
-        this.rank = rank;
-        this.taskCluster = taskCluster;
-    }
-
-    public int getRank() {
-        return rank;
-    }
-
-    public TaskCluster getTaskCluster() {
-        return taskCluster;
-    }
-
-    @Override
-    public String toString() {
-        return "[" + rank + ":" + taskCluster + "]";
-    }
-
-    @Override
-    public int compareTo(RankedRunnableTaskCluster o) {
-        int cmp = rank - o.rank;
-        return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java
new file mode 100644
index 0000000..6168dce
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.scheduler;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+
+public class ResourceManager implements IResourceManager {
+
+    // The maximum capacity, assuming that there is no running job that occupies capacity.
+    // It is unchanged unless any node is added, removed or updated.
+    private IClusterCapacity maxCapacity = new ClusterCapacity();
+
+    // The current capacity, which is dynamically changing.
+    private IClusterCapacity currentCapacity = new ClusterCapacity();
+
+    @Override
+    public IReadOnlyClusterCapacity getMaximumCapacity() {
+        return maxCapacity;
+    }
+
+    @Override
+    public IClusterCapacity getCurrentCapacity() {
+        return currentCapacity;
+    }
+
+    @Override
+    public void update(String nodeId, NodeCapacity capacity) throws HyracksException {
+        maxCapacity.update(nodeId, capacity);
+        currentCapacity.update(nodeId, capacity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java
deleted file mode 100644
index 70d3f16..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-public final class Runnability {
-    private final Tag tag;
-
-    private final int priority;
-
-    public Runnability(Tag tag, int priority) {
-        this.tag = tag;
-        this.priority = priority;
-    }
-
-    public Tag getTag() {
-        return tag;
-    }
-
-    public int getPriority() {
-        return priority;
-    }
-
-    public enum Tag {
-        COMPLETED,
-        NOT_RUNNABLE,
-        RUNNABLE,
-        RUNNING,
-    }
-
-    public static Runnability getWorstCase(Runnability r1, Runnability r2) {
-        switch (r1.tag) {
-            case COMPLETED:
-                switch (r2.tag) {
-                    case COMPLETED:
-                    case NOT_RUNNABLE:
-                    case RUNNABLE:
-                    case RUNNING:
-                        return r2;
-                }
-                break;
-
-            case NOT_RUNNABLE:
-                switch (r2.tag) {
-                    case COMPLETED:
-                    case NOT_RUNNABLE:
-                    case RUNNABLE:
-                    case RUNNING:
-                        return r1;
-                }
-                break;
-
-            case RUNNABLE:
-                switch (r2.tag) {
-                    case COMPLETED:
-                        return r1;
-
-                    case RUNNING:
-                        return r1.priority > 0 ? r1 : new Runnability(Tag.RUNNABLE, 1);
-
-                    case NOT_RUNNABLE:
-                        return r2;
-
-                    case RUNNABLE:
-                        return r1.priority > r2.priority ? r1 : r2;
-                }
-                break;
-
-            case RUNNING:
-                switch (r2.tag) {
-                    case COMPLETED:
-                    case RUNNING:
-                        return r1;
-
-                    case NOT_RUNNABLE:
-                        return r2;
-
-                    case RUNNABLE:
-                        return r2.priority > 0 ? r2 : new Runnability(Tag.RUNNABLE, 1);
-                }
-                break;
-        }
-        throw new IllegalArgumentException("Could not aggregate: " + r1 + " and " + r2);
-    }
-
-    @Override
-    public String toString() {
-        return "{" + tag + ", " + priority + "}";
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java
index e69884a..892807b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java
@@ -45,7 +45,7 @@ public class JobsRESTAPIFunction implements IJSONOutputFunction {
                     break;
                 }
             case 0: {
-                GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs);
+                GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager());
                 ccs.getWorkQueue().scheduleAndSync(gjse);
                 result.set("result", gjse.getSummaries());
                 break;
@@ -59,7 +59,7 @@ public class JobsRESTAPIFunction implements IJSONOutputFunction {
                     ccs.getWorkQueue().scheduleAndSync(gjage);
                     result.set("result", gjage.getJSON());
                 } else if ("job-run".equalsIgnoreCase(arguments[1])) {
-                    GetJobRunJSONWork gjre = new GetJobRunJSONWork(ccs, jobId);
+                    GetJobRunJSONWork gjre = new GetJobRunJSONWork(ccs.getJobManager(), jobId);
                     ccs.getWorkQueue().scheduleAndSync(gjre);
                     result.set("result", gjre.getJSON());
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java
index 8994895..3b4918c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java
@@ -18,14 +18,14 @@
  */
 package org.apache.hyracks.control.cc.web;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.web.util.IJSONOutputFunction;
 import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
 import org.apache.hyracks.control.cc.work.GetNodeSummariesJSONWork;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class NodesRESTAPIFunction implements IJSONOutputFunction {
     private ClusterControllerService ccs;
 
@@ -40,12 +40,13 @@ public class NodesRESTAPIFunction implements IJSONOutputFunction {
         switch (arguments.length) {
             case 1: {
                 if ("".equals(arguments[0])) {
-                    GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs);
+                    GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs.getNodeManager());
                     ccs.getWorkQueue().scheduleAndSync(gnse);
                     result.set("result", gnse.getSummaries());
                 } else {
                     String nodeId = arguments[0];
-                    GetNodeDetailsJSONWork gnde = new GetNodeDetailsJSONWork(ccs, nodeId, true, true);
+                    GetNodeDetailsJSONWork gnde = new GetNodeDetailsJSONWork(ccs.getNodeManager(), ccs.getCCConfig(),
+                            nodeId, true, true);
                     ccs.getWorkQueue().scheduleAndSync(gnde);
                     result.set("result", gnde.getDetail());
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java
index b55e65d..8fe68c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.control.cc.web;
 
 import java.util.EnumSet;
-import java.util.logging.Logger;
 
 import javax.servlet.DispatcherType;
 
@@ -43,8 +42,6 @@ import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 
 public class WebServer {
-    private final static Logger LOGGER = Logger.getLogger(WebServer.class.getName());
-
     private final ClusterControllerService ccs;
     private final Server server;
     private final ServerConnector connector;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
index ad333fe..fa5dcd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
@@ -19,10 +19,9 @@
 
 package org.apache.hyracks.control.cc.work;
 
-import java.util.Map;
-
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
@@ -40,8 +39,8 @@ public abstract class AbstractHeartbeatWork extends SynchronizableWork {
 
     @Override
     public void doRun() {
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        NodeControllerState state = nodeMap.get(nodeId);
+        INodeManager nodeManager = ccs.getNodeManager();
+        NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         if (state != null) {
             state.notifyHeartbeat(hbData);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 9134a91..3babf00 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.ActivityPlan;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.Task;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
@@ -50,7 +51,8 @@ public abstract class AbstractTaskLifecycleWork extends AbstractHeartbeatWork {
 
     @Override
     public final void runWork() {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        IJobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.get(jobId);
         if (run != null) {
             TaskId tid = taId.getTaskId();
             Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterGraph().getActivityMap();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
index 741c641..6480674 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -20,15 +20,14 @@
 package org.apache.hyracks.control.cc.work;
 
 import java.net.URL;
+import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
 
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.work.IPCResponder;
@@ -72,12 +71,8 @@ public class CliDeployBinaryWork extends SynchronizableWork {
             /**
              * Deploy for the node controllers
              */
-            Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
-
-            Set<String> nodeIds = new TreeSet<String>();
-            for (String nc : nodeControllerStateMap.keySet()) {
-                nodeIds.add(nc);
-            }
+            INodeManager nodeManager = ccs.getNodeManager();
+            Collection<String> nodeIds = nodeManager.getAllNodeIds();
             final DeploymentRun dRun = new DeploymentRun(nodeIds);
 
             /** The following call prevents a user to deploy with the same deployment id simultaneously. */
@@ -86,7 +81,7 @@ public class CliDeployBinaryWork extends SynchronizableWork {
             /***
              * deploy binaries to each node controller
              */
-            for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+            for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) {
                 ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
             }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index 5f97ce2..de28c32 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -19,14 +19,14 @@
 
 package org.apache.hyracks.control.cc.work;
 
-import java.util.Map;
+import java.util.Collection;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
 
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.work.IPCResponder;
@@ -68,12 +68,8 @@ public class CliUnDeployBinaryWork extends SynchronizableWork {
             /**
              * Deploy for the node controllers
              */
-            Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
-
-            Set<String> nodeIds = new TreeSet<String>();
-            for (String nc : nodeControllerStateMap.keySet()) {
-                nodeIds.add(nc);
-            }
+            INodeManager nodeManager = ccs.getNodeManager();
+            Collection<String> nodeIds = nodeManager.getAllNodeIds();
             final DeploymentRun dRun = new DeploymentRun(nodeIds);
 
             /** The following call prevents a user to undeploy with the same deployment id simultaneously. */
@@ -82,7 +78,7 @@ public class CliUnDeployBinaryWork extends SynchronizableWork {
             /***
              * deploy binaries to each node controller
              */
-            for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+            for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) {
                 ncs.getNodeController().undeployBinary(deploymentId);
             }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
index e05dfbc..0b89f55 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -19,14 +19,13 @@
 
 package org.apache.hyracks.control.cc.work;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.shutdown.ShutdownRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -52,9 +51,8 @@ public class ClusterShutdownWork extends SynchronizableWork {
             if (ccs.getShutdownRun() != null) {
                 throw new IPCException("Shutdown already in progress");
             }
-            Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
-            Set<String> nodeIds = new TreeSet<>();
-            nodeIds.addAll(nodeControllerStateMap.keySet());
+            INodeManager nodeManager = ccs.getNodeManager();
+            Collection<String> nodeIds = nodeManager.getAllNodeIds();
             /**
              * set up our listener for the node ACKs
              */
@@ -64,7 +62,7 @@ public class ClusterShutdownWork extends SynchronizableWork {
             /**
              * Shutdown all the nodes...
              */
-            nodeControllerStateMap.forEach(this::shutdownNode);
+            nodeManager.apply(this::shutdownNode);
 
             ccs.getExecutor().execute(new Runnable() {
                 @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
index 91a5906..7709a2b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.cc.work;
 
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GatherStateDumpsWork extends SynchronizableWork {
@@ -41,8 +42,9 @@ public class GatherStateDumpsWork extends SynchronizableWork {
     @Override
     public void doRun() throws Exception {
         ccs.addStateDumpRun(sdr.stateDumpId, sdr);
-        sdr.setNCs(new HashSet<>(ccs.getNodeMap().keySet()));
-        for (NodeControllerState ncs : ccs.getNodeMap().values()) {
+        INodeManager nodeManager = ccs.getNodeManager();
+        sdr.setNCs(nodeManager.getAllNodeIds());
+        for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) {
             ncs.getNodeController().dumpState(sdr.stateDumpId);
         }
     }
@@ -59,7 +61,7 @@ public class GatherStateDumpsWork extends SynchronizableWork {
 
         private final Map<String, String> ncStates;
 
-        private Set<String> ncIds;
+        private Collection<String> ncIds;
 
         private boolean complete;
 
@@ -70,7 +72,7 @@ public class GatherStateDumpsWork extends SynchronizableWork {
             complete = false;
         }
 
-        public void setNCs(Set<String> ncIds) {
+        public void setNCs(Collection<String> ncIds) {
             this.ncIds = ncIds;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
index 294ae97..6a4a1d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
@@ -18,14 +18,15 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class GetActivityClusterGraphJSONWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final JobId jobId;
@@ -38,15 +39,12 @@ public class GetActivityClusterGraphJSONWork extends SynchronizableWork {
 
     @Override
     protected void doRun() throws Exception {
-
+        IJobManager jobManager = ccs.getJobManager();
         ObjectMapper om = new ObjectMapper();
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        JobRun run = jobManager.get(jobId);
         if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
-            if (run == null) {
-                json = om.createObjectNode();
-                return;
-            }
+            json = om.createObjectNode();
+            return;
         }
         json = run.getActivityClusterGraph().toJSON();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
index 31a829c..872fb9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
@@ -23,19 +23,20 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GetIpAddressNodeNameMapWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
+    private final INodeManager nodeManager;
     private Map<InetAddress, Set<String>> map;
 
-    public GetIpAddressNodeNameMapWork(ClusterControllerService ccs, Map<InetAddress, Set<String>> map) {
-        this.ccs = ccs;
+    public GetIpAddressNodeNameMapWork(INodeManager nodeManager, Map<InetAddress, Set<String>> map) {
+        this.nodeManager = nodeManager;
         this.map = map;
     }
 
     @Override
     protected void doRun() throws Exception {
-        map.putAll(ccs.getIpAddressNodeNameMap());
+        map.putAll(nodeManager.getIpAddressNodeNameMap());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
index e072c21..8fe6470 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
@@ -20,18 +20,18 @@ package org.apache.hyracks.control.cc.work;
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GetJobInfoWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
+    private final IJobManager jobManager;
     private final JobId jobId;
     private final IResultCallback<JobInfo> callback;
 
-    public GetJobInfoWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobInfo> callback) {
-        this.ccs = ccs;
+    public GetJobInfoWork(IJobManager jobManager, JobId jobId, IResultCallback<JobInfo> callback) {
+        this.jobManager = jobManager;
         this.jobId = jobId;
         this.callback = callback;
     }
@@ -39,10 +39,7 @@ public class GetJobInfoWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         try {
-            JobRun run = ccs.getActiveRunMap().get(jobId);
-            if (run == null) {
-                run = ccs.getRunMapArchive().get(jobId);
-            }
+            JobRun run = jobManager.get(jobId);
             JobInfo info = (run != null) ? new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations())
                     : null;
             callback.setValue(info);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
index aad6edf..3a7c449 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
@@ -18,34 +18,31 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class GetJobRunJSONWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
+    private final IJobManager jobManager;
     private final JobId jobId;
     private ObjectNode json;
 
-    public GetJobRunJSONWork(ClusterControllerService ccs, JobId jobId) {
-        this.ccs = ccs;
+    public GetJobRunJSONWork(IJobManager jobManager, JobId jobId) {
+        this.jobManager = jobManager;
         this.jobId = jobId;
     }
 
     @Override
     protected void doRun() throws Exception {
         ObjectMapper om = new ObjectMapper();
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        JobRun run = jobManager.get(jobId);
         if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
-            if (run == null) {
-                json = om.createObjectNode();
-                return;
-            }
+            json = om.createObjectNode();
+            return;
         }
         json = run.toJSON();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
index d45a9cc..b5bf8b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
@@ -16,22 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.cc.work;
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GetJobStatusWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
+    private final IJobManager jobManager;
     private final JobId jobId;
     private final IResultCallback<JobStatus> callback;
 
-    public GetJobStatusWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobStatus> callback) {
-        this.ccs = ccs;
+    public GetJobStatusWork(IJobManager jobManager, JobId jobId, IResultCallback<JobStatus> callback) {
+        this.jobManager = jobManager;
         this.jobId = jobId;
         this.callback = callback;
     }
@@ -39,10 +40,7 @@ public class GetJobStatusWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         try {
-            JobRun run = ccs.getActiveRunMap().get(jobId);
-            if (run == null) {
-                run = ccs.getRunMapArchive().get(jobId);
-            }
+            JobRun run = jobManager.get(jobId);
             JobStatus status = run == null ? null : run.getStatus();
             callback.setValue(status);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
index 1e5a3a5..9c680c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
@@ -16,31 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.cc.work;
 
 import java.util.Collection;
 
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GetJobSummariesJSONWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
+    private final IJobManager jobManager;
     private ArrayNode summaries;
 
-    public GetJobSummariesJSONWork(ClusterControllerService ccs) {
-        this.ccs = ccs;
+    public GetJobSummariesJSONWork(IJobManager jobManager) {
+        this.jobManager = jobManager;
     }
 
     @Override
     protected void doRun() throws Exception {
         ObjectMapper om = new ObjectMapper();
         summaries = om.createArrayNode();
-        populateJSON(ccs.getActiveRunMap().values());
-        populateJSON(ccs.getRunMapArchive().values());
+        populateJSON(jobManager.getRunningJobs());
+        populateJSON(jobManager.getPendingJobs());
+        populateJSON(jobManager.getArchivedJobs());
     }
 
     private void populateJSON(Collection<JobRun> jobRuns)  {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index a0150f2..c36b887 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -18,35 +18,25 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.NodeStatus;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class GetNodeControllersInfoWork extends AbstractWork {
-    private final ClusterControllerService ccs;
+    private final INodeManager nodeManager;
     private IResultCallback<Map<String, NodeControllerInfo>> callback;
 
-    public GetNodeControllersInfoWork(ClusterControllerService ccs,
+    public GetNodeControllersInfoWork(INodeManager nodeManager,
             IResultCallback<Map<String, NodeControllerInfo>> callback) {
-        this.ccs = ccs;
+        this.nodeManager = nodeManager;
         this.callback = callback;
     }
 
     @Override
     public void run() {
-        Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            NodeControllerState ncState = e.getValue();
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, ncState.getDataPort(),
-                    ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getNumCores()));
-        }
-        callback.setValue(result);
+        callback.setValue(nodeManager.getNodeControllerInfoMap());
     }
 }


Mime
View raw message