asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject [5/7] asterixdb git commit: Implements concurrent query management support.
Date Wed, 25 Jan 2017 14:46:38 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
new file mode 100644
index 0000000..354019c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.cluster;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+
+public class NodeManager implements INodeManager {
+    private static final Logger LOGGER = Logger.getLogger(NodeManager.class.getName());
+
+    private final CCConfig ccConfig;
+    private final IResourceManager resourceManager;
+    private final Map<String, NodeControllerState> nodeRegistry;
+    private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
+
+    public NodeManager(CCConfig ccConfig, IResourceManager resourceManager) {
+        this.ccConfig = ccConfig;
+        this.resourceManager = resourceManager;
+        this.nodeRegistry = new LinkedHashMap<>();
+        this.ipAddressNodeNameMap = new HashMap<>();
+    }
+
+    @Override
+    public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
+        return Collections.unmodifiableMap(ipAddressNodeNameMap);
+    }
+
+    @Override
+    public Collection<String> getAllNodeIds() {
+        return Collections.unmodifiableSet(nodeRegistry.keySet());
+    }
+
+    @Override
+    public Collection<NodeControllerState> getAllNodeControllerStates() {
+        return Collections.unmodifiableCollection(nodeRegistry.values());
+    }
+
+    @Override
+    public NodeControllerState getNodeControllerState(String nodeId) {
+        return nodeRegistry.get(nodeId);
+    }
+
+    @Override
+    public void addNode(String nodeId, NodeControllerState ncState) throws HyracksException {
+        if (nodeId == null || ncState == null) {
+            throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+        }
+        // Updates the node registry.
+        if (nodeRegistry.containsKey(nodeId)) {
+            LOGGER.warning("Node with name " + nodeId + " has already registered.");
+            return;
+        }
+        nodeRegistry.put(nodeId, ncState);
+
+        // Updates the IP address to node names map.
+        try {
+            InetAddress ipAddress = getIpAddress(ncState);
+            Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+            if (nodes == null) {
+                nodes = new HashSet<>();
+                ipAddressNodeNameMap.put(ipAddress, nodes);
+            }
+            nodes.add(nodeId);
+        } catch (HyracksException e) {
+            // If anything fails, we ignore the node.
+            nodeRegistry.remove(nodeId);
+            throw e;
+        }
+
+        // Updates the cluster capacity.
+        resourceManager.update(nodeId, ncState.getCapacity());
+    }
+
+    @Override
+    public void removeNode(String nodeId) throws HyracksException {
+        NodeControllerState ncState = nodeRegistry.remove(nodeId);
+        removeNodeFromIpAddressMap(nodeId, ncState);
+
+        // Updates the cluster capacity.
+        resourceManager.update(nodeId, new NodeCapacity(0L, 0));
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfoMap() {
+        Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
+        for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+            NodeControllerState ncState = e.getValue();
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, ncState.getDataPort(),
+                    ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores()));
+        }
+        return result;
+    }
+
+    @Override
+    public Pair<Collection<String>, Collection<JobId>> removeDeadNodes() throws HyracksException {
+        Set<String> deadNodes = new HashSet<>();
+        Set<JobId> affectedJobIds = new HashSet<>();
+        Iterator<Map.Entry<String, NodeControllerState>> nodeIterator = nodeRegistry.entrySet().iterator();
+        while (nodeIterator.hasNext()) {
+            Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
+            String nodeId = entry.getKey();
+            NodeControllerState state = entry.getValue();
+            if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+                deadNodes.add(nodeId);
+                affectedJobIds.addAll(state.getActiveJobIds());
+                // Removes the node from node map.
+                nodeIterator.remove();
+                // Removes the node from IP map.
+                removeNodeFromIpAddressMap(nodeId, state);
+                // Updates the cluster capacity.
+                resourceManager.update(nodeId, new NodeCapacity(0L, 0));
+                LOGGER.info(entry.getKey() + " considered dead");
+            }
+        }
+        return Pair.of(deadNodes, affectedJobIds);
+    }
+
+    @Override
+    public void apply(NodeFunction nodeFunction) {
+        nodeRegistry.forEach(nodeFunction::apply);
+    }
+
+    // Removes the entry of the node in <code>ipAddressNodeNameMap</code>.
+    private void removeNodeFromIpAddressMap(String nodeId, NodeControllerState ncState) throws HyracksException {
+        InetAddress ipAddress = getIpAddress(ncState);
+        Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+        if (nodes != null) {
+            nodes.remove(nodeId);
+            if (nodes.isEmpty()) {
+                // Removes the ip if no corresponding node exists.
+                ipAddressNodeNameMap.remove(ipAddress);
+            }
+        }
+    }
+
+    // Retrieves the IP address for a given node.
+    private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
+        String ipAddress = ncState.getNCConfig().dataIPAddress;
+        if (ncState.getNCConfig().dataPublicIPAddress != null) {
+            ipAddress = ncState.getNCConfig().dataPublicIPAddress;
+        }
+        try {
+            return InetAddress.getByName(ipAddress);
+        } catch (UnknownHostException e) {
+            throw HyracksException.create(ErrorCode.INVALID_NETWORK_ADDRESS, e, e.getMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
new file mode 100644
index 0000000..57b8c50
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.executor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityCluster;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
+import org.apache.hyracks.control.cc.job.ActivityPlan;
+import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.cc.job.Task;
+import org.apache.hyracks.control.cc.job.TaskCluster;
+import org.apache.hyracks.control.cc.job.TaskClusterId;
+
+class ActivityClusterPlanner {
+    private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
+
+    private final JobExecutor executor;
+
+    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
+    ActivityClusterPlanner(JobExecutor newJobExecutor) {
+        this.executor = newJobExecutor;
+        partitionProducingTaskClusterMap = new HashMap<>();
+    }
+
+    ActivityClusterPlan planActivityCluster(ActivityCluster ac) throws HyracksException {
+        JobRun jobRun = executor.getJobRun();
+        Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+
+        Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
+
+        assignConnectorPolicy(ac, activityPlanMap);
+
+        TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Plan for " + ac);
+            LOGGER.info("Built " + taskClusters.length + " Task Clusters");
+            for (TaskCluster tc : taskClusters) {
+                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+            }
+        }
+
+        return new ActivityClusterPlan(taskClusters, activityPlanMap);
+    }
+
+    private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
+            Map<ActivityId, ActivityPartitionDetails> pcMap) {
+        Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<>();
+        Set<ActivityId> depAnIds = new HashSet<>();
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
+            depAnIds.clear();
+            getDependencyActivityIds(depAnIds, anId, ac);
+            ActivityPartitionDetails apd = pcMap.get(anId);
+            Task[] tasks = new Task[apd.getPartitionCount()];
+            ActivityPlan activityPlan = new ActivityPlan(apd);
+            for (int i = 0; i < tasks.length; ++i) {
+                TaskId tid = new TaskId(anId, i);
+                tasks[i] = new Task(tid, activityPlan);
+                for (ActivityId danId : depAnIds) {
+                    ActivityCluster dAC = ac.getActivityClusterGraph().getActivityMap().get(danId);
+                    ActivityClusterPlan dACP = jobRun.getActivityClusterPlanMap().get(dAC.getId());
+                    assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for "
+                            + "dependency AC: Encountered no plan for ActivityID "
+                            + danId;
+                    Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
+                    assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for"
+                            + " dependency AC: Encountered no plan for ActivityID "
+                            + danId;
+                    assert dATasks.length == tasks.length : "Dependency activity partitioned differently from "
+                            + "dependent: " + dATasks.length + " != " + tasks.length;
+                    Task dTask = dATasks[i];
+                    TaskId dTaskId = dTask.getTaskId();
+                    tasks[i].getDependencies().add(dTaskId);
+                    dTask.getDependents().add(tid);
+                }
+            }
+            activityPlan.setTasks(tasks);
+            activityPlanMap.put(anId, activityPlan);
+        }
+        return activityPlanMap;
+    }
+
+    private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
+            Map<ActivityId, ActivityPlan> activityPlanMap) {
+        Set<ActivityId> activities = ac.getActivityMap().keySet();
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
+                activityPlanMap, activities);
+
+        TaskCluster[] taskClusters = ac.getActivityClusterGraph().isUseConnectorPolicyForScheduling()
+                ? buildConnectorPolicyAwareTaskClusters(ac, activityPlanMap, taskConnectivity)
+                : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
+
+        for (TaskCluster tc : taskClusters) {
+            Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
+            for (Task ts : tc.getTasks()) {
+                TaskId tid = ts.getTaskId();
+                List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
+                if (cInfoList != null) {
+                    for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
+                        Task targetTS = activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft()
+                                .getPartition()];
+                        TaskCluster targetTC = targetTS.getTaskCluster();
+                        if (targetTC != tc) {
+                            ConnectorDescriptorId cdId = p.getRight();
+                            PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(), p.getLeft()
+                                    .getPartition());
+                            tc.getProducedPartitions().add(pid);
+                            targetTC.getRequiredPartitions().add(pid);
+                            partitionProducingTaskClusterMap.put(pid, tc);
+                        }
+                    }
+                }
+                for (TaskId dTid : ts.getDependencies()) {
+                    TaskCluster dTC = getTaskCluster(dTid);
+                    dTC.getDependentTaskClusters().add(tc);
+                    tcDependencyTaskClusters.add(dTC);
+                }
+            }
+        }
+        return taskClusters;
+    }
+
+    private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
+            Map<ActivityId, ActivityPlan> activityPlanMap) {
+        List<Task> taskStates = new ArrayList<>();
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
+            ActivityPlan ap = activityPlanMap.get(anId);
+            Task[] tasks = ap.getTasks();
+            taskStates.addAll(Arrays.asList(tasks));
+        }
+        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac, taskStates.toArray(new Task[taskStates
+                .size()]));
+        for (Task t : tc.getTasks()) {
+            t.setTaskCluster(tc);
+        }
+        return new TaskCluster[] { tc };
+    }
+
+    private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
+            Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<>();
+        ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
+        BitSet targetBitmap = new BitSet();
+        for (ActivityId ac1 : activities) {
+            ActivityCluster ac = acg.getActivityMap().get(ac1);
+            Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
+            int nProducers = ac1TaskStates.length;
+            List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(ac1);
+            if (outputConns == null) {
+                continue;
+            }
+            for (IConnectorDescriptor c : outputConns) {
+                ConnectorDescriptorId cdId = c.getConnectorId();
+                ActivityId ac2 = ac.getConsumerActivity(cdId);
+                Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
+                int nConsumers = ac2TaskStates.length;
+                if (c.allProducersToAllConsumers()) {
+                    List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = new ArrayList<>();
+                    for (int j = 0; j < nConsumers; j++) {
+                        TaskId targetTID = ac2TaskStates[j].getTaskId();
+                        cInfoList.add(Pair.of(targetTID, cdId));
+                    }
+                    for (int i = 0; i < nProducers; ++i) {
+                        taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                    }
+                    continue;
+                }
+                for (int i = 0; i < nProducers; ++i) {
+                    c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                    List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity
+                            .get(ac1TaskStates[i].getTaskId());
+                    if (cInfoList == null) {
+                        cInfoList = new ArrayList<>();
+                        taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                    }
+                    for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                        TaskId targetTID = ac2TaskStates[j].getTaskId();
+                        cInfoList.add(Pair.of(targetTID, cdId));
+                    }
+                }
+            }
+        }
+        return taskConnectivity;
+    }
+
+    private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
+            Map<ActivityId, ActivityPlan> activityPlanMap,
+            Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
+        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<>();
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
+            ActivityPlan ap = activityPlanMap.get(anId);
+            Task[] tasks = ap.getTasks();
+            for (Task t : tasks) {
+                Set<TaskId> cluster = new HashSet<>();
+                TaskId tid = t.getTaskId();
+                cluster.add(tid);
+                taskClusterMap.put(tid, cluster);
+            }
+        }
+
+        JobRun jobRun = executor.getJobRun();
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
+        for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
+            Set<TaskId> cluster = taskClusterMap.get(e.getKey());
+            for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
+                IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
+                if (cPolicy.requiresProducerConsumerCoscheduling()) {
+                    cluster.add(p.getLeft());
+                }
+            }
+        }
+
+        /*
+         * We compute the transitive closure of this (producer-consumer) relation to find the largest set of
+         * tasks that need to be co-scheduled.
+         */
+        int counter = 0;
+        TaskId[] ordinalList = new TaskId[taskClusterMap.size()];
+        Map<TaskId, Integer> ordinalMap = new HashMap<>();
+        for (TaskId tid : taskClusterMap.keySet()) {
+            ordinalList[counter] = tid;
+            ordinalMap.put(tid, counter);
+            ++counter;
+        }
+
+        int n = ordinalList.length;
+        BitSet[] paths = new BitSet[n];
+        for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
+            int i = ordinalMap.get(e.getKey());
+            BitSet bsi = paths[i];
+            if (bsi == null) {
+                bsi = new BitSet(n);
+                paths[i] = bsi;
+            }
+            for (TaskId ttid : e.getValue()) {
+                int j = ordinalMap.get(ttid);
+                paths[i].set(j);
+                BitSet bsj = paths[j];
+                if (bsj == null) {
+                    bsj = new BitSet(n);
+                    paths[j] = bsj;
+                }
+                bsj.set(i);
+            }
+        }
+        for (int k = 0; k < n; ++k) {
+            for (int i = paths[k].nextSetBit(0); i >= 0; i = paths[k].nextSetBit(i + 1)) {
+                for (int j = paths[i].nextClearBit(0); j < n && j >= 0; j = paths[i].nextClearBit(j + 1)) {
+                    paths[i].set(j, paths[k].get(j));
+                    paths[j].set(i, paths[i].get(j));
+                }
+            }
+        }
+        BitSet pending = new BitSet(n);
+        pending.set(0, n);
+        List<List<TaskId>> clusters = new ArrayList<>();
+        for (int i = pending.nextSetBit(0); i >= 0; i = pending.nextSetBit(i)) {
+            List<TaskId> cluster = new ArrayList<>();
+            for (int j = paths[i].nextSetBit(0); j >= 0; j = paths[i].nextSetBit(j + 1)) {
+                cluster.add(ordinalList[j]);
+                pending.clear(j);
+            }
+            clusters.add(cluster);
+        }
+
+        List<TaskCluster> tcSet = new ArrayList<>();
+        counter = 0;
+        for (List<TaskId> cluster : clusters) {
+            List<Task> taskStates = new ArrayList<>();
+            for (TaskId tid : cluster) {
+                taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
+            }
+            TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), counter++), ac,
+                    taskStates.toArray(new Task[taskStates.size()]));
+            tcSet.add(tc);
+            for (TaskId tid : cluster) {
+                activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()].setTaskCluster(tc);
+            }
+        }
+        return tcSet.toArray(new TaskCluster[tcSet.size()]);
+    }
+
+    private TaskCluster getTaskCluster(TaskId tid) {
+        JobRun run = executor.getJobRun();
+        ActivityCluster ac = run.getActivityClusterGraph().getActivityMap().get(tid.getActivityId());
+        ActivityClusterPlan acp = run.getActivityClusterPlanMap().get(ac.getId());
+        Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
+        Task task = tasks[tid.getPartition()];
+        assert task.getTaskId().equals(tid);
+        return task.getTaskCluster();
+    }
+
+    private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId, ActivityCluster ac) {
+        Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(anId);
+        if (blockers != null) {
+            depAnIds.addAll(blockers);
+        }
+    }
+
+    private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
+        Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<>();
+        Set<ActivityId> activities = ac.getActivityMap().keySet();
+        BitSet targetBitmap = new BitSet();
+        for (ActivityId a1 : activities) {
+            Task[] ac1TaskStates = taskMap.get(a1).getTasks();
+            int nProducers = ac1TaskStates.length;
+            List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(a1);
+            if (outputConns == null) {
+                continue;
+            }
+            for (IConnectorDescriptor c : outputConns) {
+                ConnectorDescriptorId cdId = c.getConnectorId();
+                ActivityId a2 = ac.getConsumerActivity(cdId);
+                Task[] ac2TaskStates = taskMap.get(a2).getTasks();
+                int nConsumers = ac2TaskStates.length;
+
+                int[] fanouts = new int[nProducers];
+                if (c.allProducersToAllConsumers()) {
+                        for (int i = 0; i < nProducers; ++i) {
+                            fanouts[i] = nConsumers;
+                        }
+                } else {
+                    for (int i = 0; i < nProducers; ++i) {
+                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                        fanouts[i] = targetBitmap.cardinality();
+                    }
+                }
+                IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
+                cPolicyMap.put(cdId, cp);
+            }
+        }
+        executor.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
+    }
+
+    private IConnectorPolicy assignConnectorPolicy(ActivityCluster ac, IConnectorDescriptor c, int nProducers,
+            int nConsumers, int[] fanouts) {
+        IConnectorPolicyAssignmentPolicy cpap = ac.getConnectorPolicyAssignmentPolicy();
+        if (cpap != null) {
+            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+        }
+        cpap = ac.getActivityClusterGraph().getConnectorPolicyAssignmentPolicy();
+        if (cpap != null) {
+            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+        }
+        return new PipeliningConnectorPolicy();
+    }
+
+    private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
+            throws HyracksException {
+        PartitionConstraintSolver solver = executor.getSolver();
+        Set<LValueConstraintExpression> lValues = new HashSet<>();
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
+            lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
+        }
+        solver.solve(lValues);
+        Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<>();
+        for (LValueConstraintExpression lv : lValues) {
+            Object value = solver.getValue(lv);
+            if (value == null) {
+                throw new HyracksException("No value found for " + lv);
+            }
+            if (!(value instanceof Number)) {
+                throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
+                        + value + ")");
+            }
+            int nParts = ((Number) value).intValue();
+            if (nParts <= 0) {
+                throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
+            }
+            nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), nParts);
+        }
+        Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<>();
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
+            int nParts = nPartMap.get(anId.getOperatorDescriptorId());
+            int[] nInputPartitions = null;
+            List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(anId);
+            if (inputs != null) {
+                nInputPartitions = new int[inputs.size()];
+                for (int i = 0; i < nInputPartitions.length; ++i) {
+                    ConnectorDescriptorId cdId = inputs.get(i).getConnectorId();
+                    ActivityId aid = ac.getProducerActivity(cdId);
+                    Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
+                    nInputPartitions[i] = nPartInt;
+                }
+            }
+            int[] nOutputPartitions = null;
+            List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(anId);
+            if (outputs != null) {
+                nOutputPartitions = new int[outputs.size()];
+                for (int i = 0; i < nOutputPartitions.length; ++i) {
+                    ConnectorDescriptorId cdId = outputs.get(i).getConnectorId();
+                    ActivityId aid = ac.getConsumerActivity(cdId);
+                    Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
+                    nOutputPartitions[i] = nPartInt;
+                }
+            }
+            ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
+            activityPartsMap.put(anId, apd);
+        }
+        return activityPartsMap;
+    }
+
+    Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
+        return partitionProducingTaskClusterMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityPartitionDetails.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityPartitionDetails.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityPartitionDetails.java
new file mode 100644
index 0000000..60f6e88
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityPartitionDetails.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.executor;
+
+import java.util.Arrays;
+
+public class ActivityPartitionDetails {
+    private final int nPartitions;
+
+    private final int[] nInputPartitions;
+
+    private final int[] nOutputPartitions;
+
+    public ActivityPartitionDetails(int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
+        this.nPartitions = nPartitions;
+        this.nInputPartitions = nInputPartitions;
+        this.nOutputPartitions = nOutputPartitions;
+    }
+
+    public int getPartitionCount() {
+        return nPartitions;
+    }
+
+    public int[] getInputPartitionCounts() {
+        return nInputPartitions;
+    }
+
+    public int[] getOutputPartitionCounts() {
+        return nOutputPartitions;
+    }
+
+    @Override
+    public String toString() {
+        return nPartitions + ":" + (nInputPartitions == null ? "[]" : Arrays.toString(nInputPartitions)) + ":"
+                + (nOutputPartitions == null ? "[]" : Arrays.toString(nOutputPartitions));
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
new file mode 100644
index 0000000..3eece52
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.executor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityCluster;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
+import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.cc.job.Task;
+import org.apache.hyracks.control.cc.job.TaskAttempt;
+import org.apache.hyracks.control.cc.job.TaskCluster;
+import org.apache.hyracks.control.cc.job.TaskClusterAttempt;
+import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
+import org.apache.hyracks.control.cc.work.JobCleanupWork;
+import org.apache.hyracks.control.common.job.PartitionState;
+import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+
+public class JobExecutor {
+    private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
+
+    private final ClusterControllerService ccs;
+
+    private final JobRun jobRun;
+
+    private final PartitionConstraintSolver solver;
+
+    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
+    private final Set<TaskCluster> inProgressTaskClusters;
+
+    private final Random random;
+
+    public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
+        this.ccs = ccs;
+        this.jobRun = jobRun;
+        solver = new PartitionConstraintSolver();
+        partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
+        inProgressTaskClusters = new HashSet<TaskCluster>();
+        solver.addConstraints(constraints);
+        random = new Random();
+    }
+
+    public JobRun getJobRun() {
+        return jobRun;
+    }
+
+    public PartitionConstraintSolver getSolver() {
+        return solver;
+    }
+
+    public void startJob() throws HyracksException {
+        startRunnableActivityClusters();
+        ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
+    }
+
+    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
+            throws HyracksException {
+        for (ActivityCluster root : roots) {
+            findRunnableTaskClusterRoots(frontier, root);
+        }
+    }
+
+    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, ActivityCluster candidate)
+            throws HyracksException {
+        boolean depsComplete = true;
+        for (ActivityCluster depAC : candidate.getDependencies()) {
+            if (!isPlanned(depAC)) {
+                depsComplete = false;
+                findRunnableTaskClusterRoots(frontier, depAC);
+            } else {
+                boolean tcRootsComplete = true;
+                for (TaskCluster tc : getActivityClusterPlan(depAC).getTaskClusters()) {
+                    if (!tc.getProducedPartitions().isEmpty()) {
+                        continue;
+                    }
+                    TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+                    if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                        tcRootsComplete = false;
+                        break;
+                    }
+                }
+                if (!tcRootsComplete) {
+                    depsComplete = false;
+                    findRunnableTaskClusterRoots(frontier, depAC);
+                }
+            }
+        }
+        if (!depsComplete) {
+            return;
+        }
+        if (!isPlanned(candidate)) {
+            ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
+            ActivityClusterPlan acPlan = acp.planActivityCluster(candidate);
+            jobRun.getActivityClusterPlanMap().put(candidate.getId(), acPlan);
+            partitionProducingTaskClusterMap.putAll(acp.getPartitionProducingTaskClusterMap());
+        }
+        for (TaskCluster tc : getActivityClusterPlan(candidate).getTaskClusters()) {
+            if (!tc.getProducedPartitions().isEmpty()) {
+                continue;
+            }
+            TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+            if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                frontier.add(tc);
+            }
+        }
+    }
+
+    private ActivityClusterPlan getActivityClusterPlan(ActivityCluster ac) {
+        return jobRun.getActivityClusterPlanMap().get(ac.getId());
+    }
+
+    private boolean isPlanned(ActivityCluster ac) {
+        return jobRun.getActivityClusterPlanMap().get(ac.getId()) != null;
+    }
+
+    private void startRunnableActivityClusters() throws HyracksException {
+        Set<TaskCluster> taskClusterRoots = new HashSet<>();
+        findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap()
+                .values());
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
+                    + inProgressTaskClusters);
+        }
+        if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
+            ccs.getWorkQueue()
+                    .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, null));
+            return;
+        }
+        startRunnableTaskClusters(taskClusterRoots);
+    }
+
+    private void startRunnableTaskClusters(Set<TaskCluster> tcRoots) throws HyracksException {
+        Map<TaskCluster, Runnability> runnabilityMap = new HashMap<>();
+        for (TaskCluster tc : tcRoots) {
+            assignRunnabilityRank(tc, runnabilityMap);
+        }
+
+        PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<>();
+        for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
+            TaskCluster tc = e.getKey();
+            Runnability runnability = e.getValue();
+            if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
+                continue;
+            }
+            int priority = runnability.getPriority();
+            if (priority >= 0 && priority < Integer.MAX_VALUE) {
+                queue.add(new RankedRunnableTaskCluster(priority, tc));
+            }
+        }
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Ranked TCs: " + queue);
+        }
+
+        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<>();
+        for (RankedRunnableTaskCluster rrtc : queue) {
+            TaskCluster tc = rrtc.getTaskCluster();
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Found runnable TC: " + tc);
+                List<TaskClusterAttempt> attempts = tc.getAttempts();
+                LOGGER.fine("Attempts so far:" + attempts.size());
+                for (TaskClusterAttempt tcAttempt : attempts) {
+                    LOGGER.fine("Status: " + tcAttempt.getStatus());
+                }
+            }
+            assignTaskLocations(tc, taskAttemptMap);
+        }
+
+        if (taskAttemptMap.isEmpty()) {
+            return;
+        }
+
+        startTasks(taskAttemptMap);
+    }
+
+    /*
+     * Runnability rank has the following semantics
+     * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0}
+     * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
+     * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _}
+     */
+    private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Computing runnability: " + goal);
+        }
+        if (runnabilityMap.containsKey(goal)) {
+            return runnabilityMap.get(goal);
+        }
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
+        if (lastAttempt != null) {
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Last Attempt Status: " + lastAttempt.getStatus());
+            }
+            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
+                runnabilityMap.put(goal, runnability);
+                return runnability;
+            }
+            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
+                runnabilityMap.put(goal, runnability);
+                return runnability;
+            }
+        }
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+        for (PartitionId pid : goal.getRequiredPartitions()) {
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Inspecting required partition: " + pid);
+            }
+            Runnability runnability;
+            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+            PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Policy: " + cPolicy + " maxState: " + maxState);
+            }
+            if (PartitionState.COMMITTED.equals(maxState)) {
+                runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+            } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
+                runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
+            } else {
+                runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
+                switch (runnability.getTag()) {
+                    case RUNNABLE:
+                        if (cPolicy.consumerWaitsForProducerToFinish()) {
+                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+                        } else {
+                            runnability = new Runnability(Runnability.Tag.RUNNABLE, runnability.getPriority() + 1);
+                        }
+                        break;
+
+                    case NOT_RUNNABLE:
+                        break;
+
+                    case RUNNING:
+                        if (cPolicy.consumerWaitsForProducerToFinish()) {
+                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+                        } else {
+                            runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+            aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
+            if (aggregateRunnability.getTag() == Runnability.Tag.NOT_RUNNABLE) {
+                // already not runnable -- cannot get better. bail.
+                break;
+            }
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("aggregateRunnability: " + aggregateRunnability);
+            }
+        }
+        runnabilityMap.put(goal, aggregateRunnability);
+        return aggregateRunnability;
+    }
+
+    private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
+            throws HyracksException {
+        ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
+        Task[] tasks = tc.getTasks();
+        List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
+        int attempts = tcAttempts.size();
+        TaskClusterAttempt tcAttempt = new TaskClusterAttempt(tc, attempts);
+        Map<TaskId, TaskAttempt> taskAttempts = new HashMap<>();
+        Map<TaskId, LValueConstraintExpression> locationMap = new HashMap<>();
+        for (int i = 0; i < tasks.length; ++i) {
+            Task ts = tasks[i];
+            TaskId tid = ts.getTaskId();
+            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
+                    tid.getPartition()), attempts), ts);
+            taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
+            locationMap.put(tid,
+                    new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
+            taskAttempts.put(tid, taskAttempt);
+        }
+        tcAttempt.setTaskAttempts(taskAttempts);
+        solver.solve(locationMap.values());
+        for (int i = 0; i < tasks.length; ++i) {
+            Task ts = tasks[i];
+            TaskId tid = ts.getTaskId();
+            TaskAttempt taskAttempt = taskAttempts.get(tid);
+            String nodeId = assignLocation(acg, locationMap, tid, taskAttempt);
+            taskAttempt.setNodeId(nodeId);
+            taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
+            taskAttempt.setStartTime(System.currentTimeMillis());
+            List<TaskAttemptDescriptor> tads = taskAttemptMap.get(nodeId);
+            if (tads == null) {
+                tads = new ArrayList<>();
+                taskAttemptMap.put(nodeId, tads);
+            }
+            OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
+            jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId);
+            ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
+            TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
+                    apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());
+            tads.add(tad);
+        }
+        tcAttempt.initializePendingTaskCounter();
+        tcAttempts.add(tcAttempt);
+
+        /**
+         * Improvement for reducing master/slave message communications, for each TaskAttemptDescriptor,
+         * we set the NetworkAddress[][] partitionLocations, in which each row is for an incoming connector descriptor
+         * and each column is for an input channel of the connector.
+         */
+        INodeManager nodeManager = ccs.getNodeManager();
+        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
+            List<TaskAttemptDescriptor> tads = e.getValue();
+            for (TaskAttemptDescriptor tad : tads) {
+                TaskAttemptId taid = tad.getTaskAttemptId();
+                int attempt = taid.getAttempt();
+                TaskId tid = taid.getTaskId();
+                ActivityId aid = tid.getActivityId();
+                List<IConnectorDescriptor> inConnectors = acg.getActivityInputs(aid);
+                int[] inPartitionCounts = tad.getInputPartitionCounts();
+                if (inPartitionCounts == null) {
+                    continue;
+                }
+                NetworkAddress[][] partitionLocations = new NetworkAddress[inPartitionCounts.length][];
+                for (int i = 0; i < inPartitionCounts.length; ++i) {
+                    ConnectorDescriptorId cdId = inConnectors.get(i).getConnectorId();
+                    IConnectorPolicy policy = jobRun.getConnectorPolicyMap().get(cdId);
+                    /**
+                     * carry sender location information into a task
+                     * when it is not the case that it is an re-attempt and the send-side
+                     * is materialized blocking.
+                     */
+                    if (attempt > 0 && policy.materializeOnSendSide() && policy.consumerWaitsForProducerToFinish()) {
+                        continue;
+                    }
+                    ActivityId producerAid = acg.getProducerActivity(cdId);
+                    partitionLocations[i] = new NetworkAddress[inPartitionCounts[i]];
+                    for (int j = 0; j < inPartitionCounts[i]; ++j) {
+                        TaskId producerTaskId = new TaskId(producerAid, j);
+                        String nodeId = findTaskLocation(producerTaskId);
+                        partitionLocations[i][j] = nodeManager.getNodeControllerState(nodeId).getDataPort();
+                    }
+                }
+                tad.setInputPartitionLocations(partitionLocations);
+            }
+        }
+
+        tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
+        tcAttempt.setStartTime(System.currentTimeMillis());
+        inProgressTaskClusters.add(tc);
+    }
+
+    private String assignLocation(ActivityClusterGraph acg, Map<TaskId, LValueConstraintExpression> locationMap,
+            TaskId tid, TaskAttempt taskAttempt) throws HyracksException {
+        ActivityId aid = tid.getActivityId();
+        ActivityCluster ac = acg.getActivityMap().get(aid);
+        Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(aid);
+        String nodeId = null;
+        if (blockers != null) {
+            for (ActivityId blocker : blockers) {
+                nodeId = findTaskLocation(new TaskId(blocker, tid.getPartition()));
+                if (nodeId != null) {
+                    break;
+                }
+            }
+        }
+        INodeManager nodeManager = ccs.getNodeManager();
+        Collection<String> liveNodes = nodeManager.getAllNodeIds();
+        if (nodeId == null) {
+            LValueConstraintExpression pLocationExpr = locationMap.get(tid);
+            Object location = solver.getValue(pLocationExpr);
+            if (location == null) {
+                // pick any
+                nodeId = liveNodes.toArray(new String[liveNodes.size()])[random.nextInt(1) % liveNodes.size()];
+            } else if (location instanceof String) {
+                nodeId = (String) location;
+            } else if (location instanceof String[]) {
+                for (String choice : (String[]) location) {
+                    if (liveNodes.contains(choice)) {
+                        nodeId = choice;
+                        break;
+                    }
+                }
+                if (nodeId == null) {
+                    throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
+                }
+            } else {
+                throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
+                        + location.getClass() + ")");
+            }
+        }
+        if (nodeId == null) {
+            throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
+        }
+        if (!liveNodes.contains(nodeId)) {
+            throw new HyracksException("Node " + nodeId + " not live");
+        }
+        return nodeId;
+    }
+
+    private String findTaskLocation(TaskId tid) {
+        ActivityId aid = tid.getActivityId();
+        ActivityCluster ac = jobRun.getActivityClusterGraph().getActivityMap().get(aid);
+        Task[] tasks = getActivityClusterPlan(ac).getActivityPlanMap().get(aid).getTasks();
+        List<TaskClusterAttempt> tcAttempts = tasks[tid.getPartition()].getTaskCluster().getAttempts();
+        if (tcAttempts == null || tcAttempts.isEmpty()) {
+            return null;
+        }
+        TaskClusterAttempt lastTCA = tcAttempts.get(tcAttempts.size() - 1);
+        TaskAttempt ta = lastTCA.getTaskAttempts().get(tid);
+        return ta == null ? null : ta.getNodeId();
+    }
+
+    private static TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
+        List<TaskClusterAttempt> attempts = tc.getAttempts();
+        if (!attempts.isEmpty()) {
+            return attempts.get(attempts.size() - 1);
+        }
+        return null;
+    }
+
+    private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
+        final DeploymentId deploymentId = jobRun.getDeploymentId();
+        final JobId jobId = jobRun.getJobId();
+        final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>(
+                jobRun.getConnectorPolicyMap());
+        INodeManager nodeManager = ccs.getNodeManager();
+        try {
+            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+            for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
+                String nodeId = entry.getKey();
+                final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
+                final NodeControllerState node = nodeManager.getNodeControllerState(nodeId);
+                if (node != null) {
+                    node.getActiveJobIds().add(jobRun.getJobId());
+                    boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
+                    }
+                    byte[] jagBytes = changed ? acgBytes : null;
+                    node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
+                            connectorPolicies, jobRun.getFlags());
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    private void abortJob(List<Exception> exceptions) {
+        Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters);
+        for (TaskCluster tc : inProgressTaskClustersCopy) {
+            abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
+        }
+        assert inProgressTaskClusters.isEmpty();
+        ccs.getWorkQueue()
+                .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions));
+    }
+
+    private void abortTaskCluster(TaskClusterAttempt tcAttempt,
+            TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
+        LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
+        Set<TaskAttemptId> abortTaskIds = new HashSet<>();
+        Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
+        for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
+            TaskAttemptId taId = ta.getTaskAttemptId();
+            TaskAttempt.TaskStatus status = ta.getStatus();
+            abortTaskIds.add(taId);
+            LOGGER.fine("Checking " + taId + ": " + ta.getStatus());
+            if (status == TaskAttempt.TaskStatus.RUNNING || status == TaskAttempt.TaskStatus.COMPLETED) {
+                ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+                ta.setEndTime(System.currentTimeMillis());
+                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
+                if (status == TaskAttempt.TaskStatus.RUNNING && abortTaskAttempts == null) {
+                    abortTaskAttempts = new ArrayList<>();
+                    abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
+                }
+                if (status == TaskAttempt.TaskStatus.RUNNING) {
+                    abortTaskAttempts.add(taId);
+                }
+            }
+        }
+        final JobId jobId = jobRun.getJobId();
+        LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+        INodeManager nodeManager = ccs.getNodeManager();
+        for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
+            final NodeControllerState node = nodeManager.getNodeControllerState(entry.getKey());
+            final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
+            if (node != null) {
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
+                }
+                try {
+                    node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, e.getMessage(), e);
+                }
+            }
+        }
+        inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
+        TaskCluster tc = tcAttempt.getTaskCluster();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
+        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+
+        tcAttempt.setStatus(failedOrAbortedStatus);
+        tcAttempt.setEndTime(System.currentTimeMillis());
+    }
+
+    private void abortDoomedTaskClusters() throws HyracksException {
+        Set<TaskCluster> doomedTaskClusters = new HashSet<>();
+        for (TaskCluster tc : inProgressTaskClusters) {
+            // Start search at TCs that produce no outputs (sinks)
+            if (tc.getProducedPartitions().isEmpty()) {
+                findDoomedTaskClusters(tc, doomedTaskClusters);
+            }
+        }
+
+        for (TaskCluster tc : doomedTaskClusters) {
+            TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+            if (tca != null) {
+                abortTaskCluster(tca, TaskClusterAttempt.TaskClusterStatus.ABORTED);
+            }
+        }
+    }
+
+    private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) {
+        if (doomedTaskClusters.contains(tc)) {
+            return true;
+        }
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt != null) {
+            switch (lastAttempt.getStatus()) {
+                case ABORTED:
+                case FAILED:
+                case COMPLETED:
+                    return false;
+                default:
+                    break;
+            }
+        }
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        boolean doomed = false;
+        for (TaskCluster depTC : tc.getDependencyTaskClusters()) {
+            if (findDoomedTaskClusters(depTC, doomedTaskClusters)) {
+                doomed = true;
+            }
+        }
+        for (PartitionId pid : tc.getRequiredPartitions()) {
+            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+            PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if ((maxState == null
+                    || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED))
+                    && findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) {
+                    doomed = true;
+            }
+        }
+        if (doomed) {
+            doomedTaskClusters.add(tc);
+        }
+        return doomed;
+    }
+
+    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
+        TaskAttemptId taId = ta.getTaskAttemptId();
+        TaskCluster tc = ta.getTask().getTaskCluster();
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt == null || taId.getAttempt() != lastAttempt.getAttempt()) {
+            LOGGER.warning(
+                    "Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
+            return;
+        }
+        TaskAttempt.TaskStatus taStatus = ta.getStatus();
+        if (taStatus != TaskAttempt.TaskStatus.RUNNING) {
+            LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
+            return;
+        }
+        ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
+        ta.setEndTime(System.currentTimeMillis());
+        if (lastAttempt.decrementPendingTasksCounter() == 0) {
+            lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+            lastAttempt.setEndTime(System.currentTimeMillis());
+            inProgressTaskClusters.remove(tc);
+            startRunnableActivityClusters();
+        }
+    }
+
+    /**
+     * Indicates that a single task attempt has encountered a failure.
+     * @param ta Failed Task Attempt
+     * @param exceptions exeptions thrown during the failure
+     */
+    public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
+        try {
+            LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
+            TaskAttemptId taId = ta.getTaskAttemptId();
+            TaskCluster tc = ta.getTask().getTaskCluster();
+            TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+            if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
+                LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
+                ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
+                abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
+                abortDoomedTaskClusters();
+                if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
+                    abortJob(exceptions);
+                    return;
+                }
+                startRunnableActivityClusters();
+            } else {
+                LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = "
+                        + lastAttempt);
+            }
+        } catch (Exception e) {
+            abortJob(Collections.singletonList(e));
+        }
+    }
+
+    /**
+     * Indicates that the provided set of nodes have left the cluster.
+     *
+     * @param deadNodes
+     *            - Set of failed nodes
+     */
+    public void notifyNodeFailures(Collection<String> deadNodes) {
+        try {
+            jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+            jobRun.getParticipatingNodeIds().removeAll(deadNodes);
+            jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
+            if (jobRun.getPendingStatus() != null && jobRun.getCleanupPendingNodeIds().isEmpty()) {
+                IJobManager jobManager = ccs.getJobManager();
+                jobManager.finalComplete(jobRun);
+                return;
+            }
+            for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
+                if (!isPlanned(ac)) {
+                    continue;
+                }
+                TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
+                if (taskClusters == null) {
+                    continue;
+                }
+                for (TaskCluster tc : taskClusters) {
+                    TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+                    if (lastTaskClusterAttempt == null || !(lastTaskClusterAttempt
+                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
+                            || lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+                        continue;
+                    }
+                    boolean abort = false;
+                    for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
+                        assert ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED
+                                || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING;
+                        if (deadNodes.contains(ta.getNodeId())) {
+                            ta.setStatus(TaskAttempt.TaskStatus.FAILED,
+                                    Collections.singletonList(new Exception("Node " + ta.getNodeId() + " failed")));
+                            ta.setEndTime(System.currentTimeMillis());
+                            abort = true;
+                        }
+                    }
+                    if (abort) {
+                        abortTaskCluster(lastTaskClusterAttempt, TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                    }
+                }
+                abortDoomedTaskClusters();
+            }
+            startRunnableActivityClusters();
+        } catch (Exception e) {
+            abortJob(Collections.singletonList(e));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/PartitionConstraintSolver.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/PartitionConstraintSolver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/PartitionConstraintSolver.java
new file mode 100644
index 0000000..baafb34
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/PartitionConstraintSolver.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.executor;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+
+public class PartitionConstraintSolver {
+    private final Map<LValueConstraintExpression, Set<ConstraintExpression>> constraints;
+
+    public PartitionConstraintSolver() {
+        constraints = new HashMap<LValueConstraintExpression, Set<ConstraintExpression>>();
+    }
+
+    public void addConstraints(Collection<Constraint> constraintCollection) {
+        for (Constraint c : constraintCollection) {
+            addConstraint(c);
+        }
+    }
+
+    public void addConstraint(Constraint c) {
+        Set<ConstraintExpression> rValues = constraints.get(c.getLValue());
+        if (rValues == null) {
+            rValues = new HashSet<>();
+            constraints.put(c.getLValue(), rValues);
+        }
+        rValues.add(c.getRValue());
+    }
+
+    public void solve(Collection<LValueConstraintExpression> targetSet) {
+        Set<LValueConstraintExpression> inProcess = new HashSet<>();
+        for (LValueConstraintExpression lv : targetSet) {
+            solveLValue(lv, inProcess);
+        }
+    }
+
+    private Solution solve(ConstraintExpression ce, Set<LValueConstraintExpression> inProcess) {
+        switch (ce.getTag()) {
+            case CONSTANT:
+                return new Solution(((ConstantExpression) ce).getValue(), Solution.Status.FOUND);
+            case PARTITION_COUNT:
+            case PARTITION_LOCATION:
+                return solveLValue((LValueConstraintExpression) ce, inProcess);
+            default:
+                return null;
+        }
+    }
+
+    private Solution solveLValue(LValueConstraintExpression lv, Set<LValueConstraintExpression> inProcess) {
+        if (inProcess.contains(lv)) {
+            return new Solution(null, Solution.Status.CYCLE);
+        }
+        Solution result = null;
+        inProcess.add(lv);
+        Set<ConstraintExpression> rValues = constraints.get(lv);
+        if (rValues == null) {
+            return new Solution(null, Solution.Status.NOT_BOUND);
+        }
+        for (ConstraintExpression ce : rValues) {
+            Solution solution = solve(ce, inProcess);
+            if (solution != null && solution.status == Solution.Status.FOUND) {
+                result = solution;
+                break;
+            }
+        }
+        if (result != null) {
+            rValues.clear();
+            rValues.add(new ConstantExpression(result.value));
+        }
+        inProcess.remove(lv);
+        return result;
+    }
+
+    public Object getValue(LValueConstraintExpression lValue) {
+        Set<ConstraintExpression> rValues = constraints.get(lValue);
+        if (rValues == null) {
+            return null;
+        }
+        if (rValues.size() != 1) {
+            return null;
+        }
+        for (ConstraintExpression ce : rValues) {
+            if (ce.getTag() == ConstraintExpression.ExpressionTag.CONSTANT) {
+                return ((ConstantExpression) ce).getValue();
+            }
+        }
+        return null;
+    }
+
+    private static class Solution {
+        enum Status {
+            FOUND,
+            CYCLE,
+            NOT_BOUND,
+        }
+
+        final Object value;
+        final Status status;
+
+        public Solution(Object value, Status status) {
+            this.value = value;
+            this.status = status;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/RankedRunnableTaskCluster.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/RankedRunnableTaskCluster.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/RankedRunnableTaskCluster.java
new file mode 100644
index 0000000..a300c0b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/RankedRunnableTaskCluster.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.executor;
+
+import org.apache.hyracks.control.cc.job.TaskCluster;
+
+public class RankedRunnableTaskCluster implements Comparable<RankedRunnableTaskCluster> {
+    private final int rank;
+    private final TaskCluster taskCluster;
+
+    public RankedRunnableTaskCluster(int rank, TaskCluster taskCluster) {
+        this.rank = rank;
+        this.taskCluster = taskCluster;
+    }
+
+    public int getRank() {
+        return rank;
+    }
+
+    public TaskCluster getTaskCluster() {
+        return taskCluster;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + rank + ":" + taskCluster + "]";
+    }
+
+    @Override
+    public int compareTo(RankedRunnableTaskCluster o) {
+        int cmp = rank - o.rank;
+        return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
+    }
+
+    @Override
+    public int hashCode() {
+        return rank;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof RankedRunnableTaskCluster)) {
+            return false;
+        }
+        RankedRunnableTaskCluster target = (RankedRunnableTaskCluster) o;
+        return rank == target.rank;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/Runnability.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/Runnability.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/Runnability.java
new file mode 100644
index 0000000..6d4b729
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/Runnability.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.executor;
+
+public final class Runnability {
+    private final Tag tag;
+
+    private final int priority;
+
+    public Runnability(Tag tag, int priority) {
+        this.tag = tag;
+        this.priority = priority;
+    }
+
+    public Tag getTag() {
+        return tag;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public enum Tag {
+        COMPLETED,
+        NOT_RUNNABLE,
+        RUNNABLE,
+        RUNNING,
+    }
+
+    public static Runnability getWorstCase(Runnability r1, Runnability r2) {
+        switch (r1.tag) {
+            case COMPLETED:
+                switch (r2.tag) {
+                    case COMPLETED:
+                    case NOT_RUNNABLE:
+                    case RUNNABLE:
+                    case RUNNING:
+                        return r2;
+                }
+                break;
+
+            case NOT_RUNNABLE:
+                switch (r2.tag) {
+                    case COMPLETED:
+                    case NOT_RUNNABLE:
+                    case RUNNABLE:
+                    case RUNNING:
+                        return r1;
+                }
+                break;
+
+            case RUNNABLE:
+                switch (r2.tag) {
+                    case COMPLETED:
+                        return r1;
+
+                    case RUNNING:
+                        return r1.priority > 0 ? r1 : new Runnability(Tag.RUNNABLE, 1);
+
+                    case NOT_RUNNABLE:
+                        return r2;
+
+                    case RUNNABLE:
+                        return r1.priority > r2.priority ? r1 : r2;
+                }
+                break;
+
+            case RUNNING:
+                switch (r2.tag) {
+                    case COMPLETED:
+                    case RUNNING:
+                        return r1;
+
+                    case NOT_RUNNABLE:
+                        return r2;
+
+                    case RUNNABLE:
+                        return r2.priority > 0 ? r2 : new Runnability(Tag.RUNNABLE, 1);
+                }
+                break;
+        }
+        throw new IllegalArgumentException("Could not aggregate: " + r1 + " and " + r2);
+    }
+
+    @Override
+    public String toString() {
+        return "{" + tag + ", " + priority + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/ActivityPlan.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/ActivityPlan.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/ActivityPlan.java
index d4df923..86a8df2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/ActivityPlan.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/ActivityPlan.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.control.cc.job;
 
-import org.apache.hyracks.control.cc.scheduler.ActivityPartitionDetails;
+import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
 
 public class ActivityPlan {
     private final ActivityPartitionDetails apd;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
new file mode 100644
index 0000000..8f621df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.job;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+
+/**
+ * This interface abstracts the job lifecycle management and job scheduling for a cluster.
+ */
+public interface IJobManager {
+
+    /**
+     * Enters a new job into the job manager.
+     * It's up to the implementation to decide whether to execute the job immediately, queue it for the
+     * later execution, or reject it immediately..
+     *
+     * @param jobRun,
+     *            the descriptor of a job.
+     * @throws HyracksException
+     *             when the job cannot even be accepted by the job manager, for example, when the pending
+     *             job queue is too large or the capacity requirement exceeds the capacity of the cluster.
+     */
+    void add(JobRun jobRun) throws HyracksException;
+
+    /**
+     * This method is called when the master process decides to complete job.
+     * The implementation of this method should instruct all involved worker processes to clean the state of each
+     * individual parallel partition up.
+     *
+     * If there is no involved worker processes, the method is responsible to call
+     * <code>finalComplete</code> directly, for example, when all worker processes died during the job execution.
+     *
+     * @param jobRun,
+     *            the descriptor of a job.
+     * @param status,
+     *            the final status of the job.
+     * @param exceptions,
+     *            a list of exceptions that are caught during job execution.
+     * @throws HyracksException
+     *             if anything goes wrong during the final job completion. No partial job states should be left.
+     */
+    void prepareComplete(JobRun jobRun, JobStatus status, List<Exception> exceptions) throws HyracksException;
+
+    /**
+     * This method gets called when all worker processes have notified the master that their individual parallel
+     * partition is completed and their corresponding states are cleaned up.
+     * The implementation of this method only needs to cleanup the states of a job within the master
+     * process.
+     *
+     * @param jobRun,
+     *            the descriptor of a job.
+     * @throws HyracksException
+     *             if anything goes wrong during the final job completion. No partial job states should be left.
+     */
+    void finalComplete(JobRun jobRun) throws HyracksException;
+
+    /**
+     * Retrieves a job from a given job id.
+     *
+     * @param jobId,
+     *            the id of the job.
+     * @return the job run, which is the descriptor a the job, or null if the job cannot be found.
+     */
+    JobRun get(JobId jobId);
+
+    /**
+     * Retrieves a historical job from a given job id.
+     *
+     * @param jobId,
+     *            the job id.
+     * @return the matched historical jobs that have been run but not yet discarded.
+     */
+    List<Exception> getRunHistory(JobId jobId);
+
+    /**
+     * @return all jobs that are currently running.
+     */
+    Collection<JobRun> getRunningJobs();
+
+    /**
+     * @return all jobs that are currently waiting in the job queue.
+     */
+    Collection<JobRun> getPendingJobs();
+
+    /**
+     * @return all jobs that are completed or terminated, but not yet discarded.
+     */
+    Collection<JobRun> getArchivedJobs();
+
+}


Mime
View raw message