helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] helix git commit: Helix per-participant concurrent task throttling
Date Fri, 23 Jun 2017 20:07:21 GMT
Repository: helix
Updated Branches:
  refs/heads/master fc59b1253 -> e35fe4fff


Helix per-participant concurrent task throttling

Add per participant concurrent task throttling.

1. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling setting.
   Add cluster configuration item "MAX_CONCURRENT_TASK_PER_INSTANCE" as the default throttling
settings.
   New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new
assignment won't be included in best possible state.
2. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than
other jobs and regular resources.
3. Add test case (TestTaskThrottling.java) for testing new throttling and priority.

Ticket:
https://issues.apache.org/jira/browse/HELIX-655

Test:
mvn test in helix-core


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/20685cf6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/20685cf6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/20685cf6

Branch: refs/heads/master
Commit: 20685cf6e1276aaa1bf6264c1fe6a3173081d22c
Parents: fc59b12
Author: Jiajun Wang <jjwang@linkedin.com>
Authored: Wed May 31 14:57:23 2017 -0700
Committer: Jiajun Wang <jjwang@linkedin.com>
Committed: Fri Jun 23 12:05:57 2017 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      | 147 ++++++++++-----
 .../controller/stages/ClusterDataCache.java     |  38 ++++
 .../controller/stages/CurrentStateOutput.java   |  49 ++++-
 .../org/apache/helix/model/ClusterConfig.java   |  14 +-
 .../org/apache/helix/model/InstanceConfig.java  |  16 +-
 .../org/apache/helix/task/JobRebalancer.java    |  45 +++--
 .../integration/task/TestTaskThrottling.java    | 183 +++++++++++++++++++
 7 files changed, 429 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/20685cf6/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 526f532..fbb7f86 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Iterator;
 import java.util.Map;
 
+import java.util.PriorityQueue;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
@@ -34,7 +36,11 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobRebalancer;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskRebalancer;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
@@ -60,6 +66,9 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
           + ". Requires CURRENT_STATE|RESOURCES|DataCache");
     }
 
+    // Reset current INIT/RUNNING tasks on participants for throttling
+    cache.resetActiveTaskCount(currentStateOutput);
+
     BestPossibleStateOutput bestPossibleStateOutput =
         compute(event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
@@ -81,70 +90,85 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource>
resourceMap,
       CurrentStateOutput currentStateOutput) {
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+    BestPossibleStateOutput output = new BestPossibleStateOutput();
+
+    PriorityQueue<ResourcePriority> resourcePriorityQueue = new PriorityQueue<ResourcePriority>();
+    TaskDriver taskDriver = null;
+    HelixManager helixManager = event.getAttribute("helixmanager");
+    if (helixManager != null) {
+      taskDriver = new TaskDriver(helixManager);
+    }
+    for (Resource resource : resourceMap.values()) {
+      resourcePriorityQueue.add(new ResourcePriority(resource, cache.getIdealState(resource.getResourceName()),
+          taskDriver));
+    }
+
+    Iterator<ResourcePriority> itr = resourcePriorityQueue.iterator();
+    while (itr.hasNext()) {
+      computeResourceBestPossibleState(event, cache, currentStateOutput, itr.next().getResource(),
output);
+    }
+
+    return output;
+  }
+
+  private void computeResourceBestPossibleState(ClusterEvent event, ClusterDataCache cache,
+      CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output)
{
     // for each ideal state
     // read the state model def
     // for each resource
     // get the preference list
     // for each instanceName check if its alive then assign a state
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
-    BestPossibleStateOutput output = new BestPossibleStateOutput();
+    String resourceName = resource.getResourceName();
+    logger.debug("Processing resource:" + resourceName);
+    // Ideal state may be gone. In that case we need to get the state model name
+    // from the current state
+    IdealState idealState = cache.getIdealState(resourceName);
 
-    for (String resourceName : resourceMap.keySet()) {
-      logger.debug("Processing resource:" + resourceName);
+    if (idealState == null) {
+      // if ideal state is deleted, use an empty one
+      logger.info("resource:" + resourceName + " does not exist anymore");
+      idealState = new IdealState(resourceName);
+      idealState.setStateModelDefRef(resource.getStateModelDefRef());
+    }
 
-      Resource resource = resourceMap.get(resourceName);
-      // Ideal state may be gone. In that case we need to get the state model name
-      // from the current state
-      IdealState idealState = cache.getIdealState(resourceName);
+    Rebalancer rebalancer = getRebalancer(idealState, resourceName);
+    MappingCalculator mappingCalculator = getMappingCalculator(rebalancer, resourceName);
 
-      if (idealState == null) {
-        // if ideal state is deleted, use an empty one
-        logger.info("resource:" + resourceName + " does not exist anymore");
-        idealState = new IdealState(resourceName);
-        idealState.setStateModelDefRef(resource.getStateModelDefRef());
-      }
+    if (rebalancer == null || mappingCalculator == null) {
+      logger.error("Error computing assignment for resource " + resourceName
+          + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCaculator: "
+          + mappingCalculator);
+    }
 
-      Rebalancer rebalancer = getRebalancer(idealState, resourceName);
-      MappingCalculator mappingCalculator = getMappingCalculator(rebalancer, resourceName);
+    if (rebalancer != null && mappingCalculator != null) {
 
-      if (rebalancer == null || mappingCalculator == null) {
-        logger.error("Error computing assignment for resource " + resourceName
-            + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCaculator: "
-            + mappingCalculator);
+      if (rebalancer instanceof TaskRebalancer) {
+        TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
+        taskRebalancer.setClusterStatusMonitor(
+            (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"));
       }
 
-      if (rebalancer != null && mappingCalculator != null) {
-
-        if (rebalancer instanceof TaskRebalancer) {
-          TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
-          taskRebalancer.setClusterStatusMonitor(
-              (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"));
-        }
+      try {
+        HelixManager manager = event.getAttribute("helixmanager");
+        rebalancer.init(manager);
+        idealState = rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput,
cache);
+        output.setPreferenceLists(resourceName, idealState.getPreferenceLists());
 
-        try {
-          HelixManager manager = event.getAttribute("helixmanager");
-          rebalancer.init(manager);
-          idealState =
-              rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput,
cache);
-          output.setPreferenceLists(resourceName, idealState.getPreferenceLists());
-
-          // Use the internal MappingCalculator interface to compute the final assignment
-          // The next release will support rebalancers that compute the mapping from start
to finish
-          ResourceAssignment partitionStateAssignment =
-              mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
-                  currentStateOutput);
-          for (Partition partition : resource.getPartitions()) {
-            Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
-            output.setState(resourceName, partition, newStateMap);
-          }
-        } catch (Exception e) {
-          logger
-              .error("Error computing assignment for resource " + resourceName + ". Skipping.",
e);
+        // Use the internal MappingCalculator interface to compute the final assignment
+        // The next release will support rebalancers that compute the mapping from start
to finish
+        ResourceAssignment partitionStateAssignment =
+            mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
currentStateOutput);
+        for (Partition partition : resource.getPartitions()) {
+          Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
+          output.setState(resourceName, partition, newStateMap);
         }
+      } catch (Exception e) {
+        logger.error("Error computing assignment for resource " + resourceName + ". Skipping.",
e);
       }
     }
-    return output;
   }
 
   private Rebalancer getRebalancer(IdealState idealState, String resourceName) {
@@ -207,4 +231,33 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
     return mappingCalculator;
   }
+
+  class ResourcePriority implements Comparable<ResourcePriority> {
+    final Resource _resource;
+    // By default, non-job resources and new jobs are assigned lowest priority
+    Long _priority = Long.MAX_VALUE;
+
+    Resource getResource() {
+      return _resource;
+    }
+
+    public ResourcePriority(Resource resource, IdealState idealState, TaskDriver taskDriver)
{
+      _resource = resource;
+
+      if (taskDriver != null && idealState != null
+          && idealState.getRebalancerClassName() != null
+          && idealState.getRebalancerClassName().equals(JobRebalancer.class.getName()))
{
+        // Update priority for job resources, note that older jobs will be processed earlier
+        JobContext jobContext = taskDriver.getJobContext(resource.getResourceName());
+        if (jobContext != null && jobContext.getStartTime() != WorkflowContext.UNSTARTED)
{
+          _priority = jobContext.getStartTime();
+        }
+      }
+    }
+
+    @Override
+    public int compareTo(ResourcePriority otherJob) {
+      return _priority.compareTo(otherJob._priority);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/20685cf6/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index e72354f..89d483b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +43,8 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskPartitionState;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Lists;
@@ -74,6 +77,8 @@ public class ClusterDataCache {
   // maintain a cache of participant messages across pipeline runs
   Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
 
+  Map<String, Integer> _participantActiveTaskCount = new HashMap<String, Integer>();
+
   boolean _init = true;
 
   boolean _updateInstanceOfflineTime = true;
@@ -555,6 +560,39 @@ public class ClusterDataCache {
     return null;
   }
 
+  public Integer getParticipantActiveTaskCount(String instance) {
+    return _participantActiveTaskCount.get(instance);
+  }
+
+  public void setParticipantActiveTaskCount(String instance, int taskCount) {
+    _participantActiveTaskCount.put(instance, taskCount);
+  }
+
+  /**
+   * Reset RUNNING/INIT tasks count based on current state output
+   */
+  public void resetActiveTaskCount(CurrentStateOutput currentStateOutput) {
+    // init participant map
+    for (String liveInstance : getLiveInstances().keySet()) {
+      _participantActiveTaskCount.put(liveInstance, 0);
+    }
+    // Active task == init and running tasks
+    fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
+        TaskPartitionState.INIT.name()), _participantActiveTaskCount);
+    fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
+        TaskPartitionState.RUNNING.name()), _participantActiveTaskCount);
+    fillActiveTaskCount(currentStateOutput.getPartitionCountWithCurrentState(TaskConstants.STATE_MODEL_NAME,
+        TaskPartitionState.INIT.name()), _participantActiveTaskCount);
+    fillActiveTaskCount(currentStateOutput.getPartitionCountWithCurrentState(TaskConstants.STATE_MODEL_NAME,
+        TaskPartitionState.RUNNING.name()), _participantActiveTaskCount);
+  }
+
+  private void fillActiveTaskCount(Map<String, Integer> additionPartitionMap, Map<String,
Integer> partitionMap) {
+    for (String participant : additionPartitionMap.keySet()) {
+      partitionMap.put(participant, partitionMap.get(participant) + additionPartitionMap.get(participant));
+    }
+  }
+
   /**
    * Indicate that a full read should be done on the next refresh
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/20685cf6/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 5633140..646c98f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -260,13 +260,60 @@ public class CurrentStateOutput {
     return partitionSet;
   }
 
+  /**
+   * Get the partitions count for each participant with the pending state and given resource
state model
+   * @param resourceStateModel specified resource state model to look up
+   * @param state specified pending resource state to look up
+   * @return set of participants to partitions mapping
+   */
+  public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel,
String state) {
+    return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingStateMap);
+  }
+
+  /**
+   * Get the partitions count for each participant in the current state and with given resource
state model
+   * @param resourceStateModel specified resource state model to look up
+   * @param state specified current resource state to look up
+   * @return set of participants to partitions mapping
+   */
+  public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel,
String state) {
+    return getPartitionCountWithState(resourceStateModel, state, (Map) _currentStateMap);
+  }
+
+  private Map<String, Integer> getPartitionCountWithState(String resourceStateModel,
String state,
+      Map<String, Map<Partition, Map<String, Object>>> stateMap) {
+    Map<String, Integer> currentPartitionCount = new HashMap<String, Integer>();
+    for (String resource : stateMap.keySet()) {
+      String stateModel = _resourceStateModelMap.get(resource);
+      if ((stateModel != null && stateModel.equals(resourceStateModel)) || (stateModel
== null
+          && resourceStateModel == null)) {
+        for (Partition partition : stateMap.get(resource).keySet()) {
+          Map<String, Object> partitionMessage = stateMap.get(resource).get(partition);
+          for (Map.Entry<String, Object> participantMap : partitionMessage.entrySet())
{
+            String participant = participantMap.getKey();
+            if (!currentPartitionCount.containsKey(participant)) {
+              currentPartitionCount.put(participant, 0);
+            }
+            String currState = participantMap.getValue().toString();
+            if (participantMap.getValue() instanceof Message) {
+              currState = ((Message) participantMap.getValue()).getToState();
+            }
+            if ((currState != null && currState.equals(state)) || (currState == null
&& state == null)) {
+              currentPartitionCount.put(participant, currentPartitionCount.get(participant)
+ 1);
+            }
+          }
+        }
+      }
+    }
+    return currentPartitionCount;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("current state= ").append(_currentStateMap);
     sb.append(", pending state= ").append(_pendingStateMap);
     return sb.toString();
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/20685cf6/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 3796f36..6aaa2b7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -36,8 +36,10 @@ public class ClusterConfig extends HelixProperty {
     FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places
the replicas from same partition.
     DELAY_REBALANCE_DISABLED,  // enabled the delayed rebalaning in case node goes offline.
     DELAY_REBALANCE_TIME,     // delayed time in ms that the delay time Helix should hold
until rebalancing.
-    BATCH_STATE_TRANSITION_MAX_THREADS
+    BATCH_STATE_TRANSITION_MAX_THREADS,
+    MAX_CONCURRENT_TASK_PER_INSTANCE
   }
+  private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
 
   /**
    * Instantiate for a specific cluster
@@ -118,6 +120,16 @@ public class ClusterConfig extends HelixProperty {
     return _record.getIntField(ClusterConfigProperty.BATCH_STATE_TRANSITION_MAX_THREADS.name(),
-1);
   }
 
+  /**
+   * Get maximum allowed running task count on all instances in this cluster.
+   * Instance level configuration will override cluster configuration.
+   * @return the maximum task count
+   */
+  public int getMaxConcurrentTaskPerInstance() {
+    return _record.getIntField(ClusterConfigProperty.MAX_CONCURRENT_TASK_PER_INSTANCE.name(),
+        DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof ClusterConfig) {

http://git-wip-us.apache.org/repos/asf/helix/blob/20685cf6/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index ce1d47e..6002591 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -47,9 +47,11 @@ public class InstanceConfig extends HelixProperty {
     HELIX_DISABLED_PARTITION,
     TAG_LIST,
     INSTANCE_WEIGHT,
-    DOMAIN
+    DOMAIN,
+    MAX_CONCURRENT_TASK
   }
   public static final int WEIGHT_NOT_SET = -1;
+  public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
 
   private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName());
 
@@ -408,6 +410,18 @@ public class InstanceConfig extends HelixProperty {
     }
   }
 
+  /**
+   * Get maximum allowed running task count on this instance
+   * @return the maximum task count
+   */
+  public int getMaxConcurrentTask() {
+    return _record.getIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), MAX_CONCURRENT_TASK_NOT_SET);
+  }
+
+  public void setMaxConcurrentTask(int maxConcurrentTask) {
+    _record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof InstanceConfig) {

http://git-wip-us.apache.org/repos/asf/helix/blob/20685cf6/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index dc96351..d891d29 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -37,7 +37,9 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
@@ -53,9 +55,9 @@ import com.google.common.collect.Sets;
  */
 public class JobRebalancer extends TaskRebalancer {
   private static final Logger LOG = Logger.getLogger(JobRebalancer.class);
-  private static TaskAssignmentCalculator fixTaskAssignmentCal =
+  private static TaskAssignmentCalculator _fixTaskAssignmentCal =
       new FixedTargetTaskAssignmentCalculator();
-  private static TaskAssignmentCalculator genericTaskAssignmentCal =
+  private static TaskAssignmentCalculator _genericTaskAssignmentCal =
       new GenericTaskAssignmentCalculator();
 
   private static final String PREV_RA_NODE = "PreviousResourceAssignment";
@@ -424,12 +426,26 @@ public class JobRebalancer extends TaskRebalancer {
             .contains(instance)) {
           continue;
         }
+        // 1. throttled by job configuration
         // Contains the set of task partitions currently assigned to the instance.
         Set<Integer> pSet = entry.getValue();
-        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        // 2. throttled by participant capacity
+        int participantCapacity = cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask();
+        if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) {
+          participantCapacity = cache.getClusterConfig().getMaxConcurrentTaskPerInstance();
+        }
+        int participantLimitation = participantCapacity - cache.getParticipantActiveTaskCount(instance);
+        // New tasks to be assigned
+        int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
+        LOG.debug(String.format(
+            "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent
Task(%d), "
+                + "Participant Max Task(%d). Remaining capacity %d.", instance, jobCfgLimitation,
participantCapacity,
+            numToAssign));
         if (numToAssign > 0) {
+          Set<Integer> throttledSet = new HashSet<Integer>();
           List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, throttledSet,
numToAssign);
           for (Integer pId : nextPartitions) {
             String pName = pName(jobResource, pId);
             paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
@@ -440,6 +456,10 @@ public class JobRebalancer extends TaskRebalancer {
             LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
                 TaskPartitionState.RUNNING, instance));
           }
+          cache.setParticipantActiveTaskCount(instance, cache.getParticipantActiveTaskCount(instance)
+ nextPartitions.size());
+          if (!throttledSet.isEmpty()) {
+            LOG.debug(throttledSet.size() + "tasks are ready but throttled when assigned
to participant.");
+          }
         }
       }
     }
@@ -579,18 +599,17 @@ public class JobRebalancer extends TaskRebalancer {
   }
 
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
-      Set<Integer> excluded, int n) {
+      Set<Integer> excluded, Set<Integer> throttled, int n) {
     List<Integer> result = new ArrayList<Integer>();
     for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
-
       if (!excluded.contains(pId)) {
-        result.add(pId);
+        if (result.size() < n) {
+          result.add(pId);
+        } else {
+          throttled.add(pId);
+        }
       }
     }
-
     return result;
   }
 
@@ -664,9 +683,9 @@ public class JobRebalancer extends TaskRebalancer {
   private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) {
     Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
     if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
-      return genericTaskAssignmentCal;
+      return _genericTaskAssignmentCal;
     } else {
-      return fixTaskAssignmentCal;
+      return _fixTaskAssignmentCal;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/20685cf6/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
new file mode 100644
index 0000000..452bc7b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
@@ -0,0 +1,183 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestTaskThrottling extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    _numNodes = 2;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testTaskThrottle() throws InterruptedException {
+    int numTasks = 30 * _numNodes;
+    int perNodeTaskLimitation = 5;
+
+    JobConfig.Builder jobConfig = generateLongRunJobConfig(numTasks);
+
+    // 1. Job executed in the participants with no limitation
+    String jobName1 = "Job1";
+    Workflow flow = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName1, jobConfig).build();
+    _driver.start(flow);
+    _driver.pollForJobState(flow.getName(), TaskUtil.getNamespacedJobName(flow.getName(),
jobName1),
+        TaskState.IN_PROGRESS);
+    // Wait for tasks to be picked up
+    Thread.sleep(1500);
+
+    Assert.assertEquals(countRunningPartition(flow, jobName1), numTasks);
+
+    _driver.stop(flow.getName());
+    _driver.pollForWorkflowState(flow.getName(), TaskState.STOPPED);
+
+    // 2. Job executed in the participants with max task limitation
+
+    // Configuring cluster
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
+    Map<String, String> properties = new HashMap<String, String>();
+    properties.put(ClusterConfig.ClusterConfigProperty.MAX_CONCURRENT_TASK_PER_INSTANCE.name(),
+        new Integer(perNodeTaskLimitation).toString());
+    _setupTool.getClusterManagementTool().setConfig(scope, properties);
+
+    String jobName2 = "Job2";
+    flow = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName2, jobConfig).build();
+    _driver.start(flow);
+    _driver.pollForJobState(flow.getName(), TaskUtil.getNamespacedJobName(flow.getName(),
jobName2),
+        TaskState.IN_PROGRESS);
+    // Wait for tasks to be picked up
+    Thread.sleep(1500);
+
+    Assert.assertEquals(countRunningPartition(flow, jobName2), _numNodes * perNodeTaskLimitation);
+
+    _driver.stop(flow.getName());
+    _driver.pollForWorkflowState(flow.getName(), TaskState.STOPPED);
+
+    // 3. Ensure job can finish normally
+    jobConfig.setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "10"));
+    String jobName3 = "Job3";
+    flow = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName3, jobConfig).build();
+    _driver.start(flow);
+    _driver.pollForJobState(flow.getName(), TaskUtil.getNamespacedJobName(flow.getName(),
jobName3),
+        TaskState.COMPLETED);
+  }
+
+  @Test(dependsOnMethods = {"testTaskThrottle"})
+  public void testJobPriority() throws InterruptedException {
+    int numTasks = 30 * _numNodes;
+    int perNodeTaskLimitation = 5;
+
+    JobConfig.Builder jobConfig = generateLongRunJobConfig(numTasks);
+
+    // Configuring participants
+    setParticipantsCapacity(perNodeTaskLimitation);
+
+    // schedule job1
+    String jobName1 = "PriorityJob1";
+    Workflow flow1 = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName1, jobConfig).build();
+    _driver.start(flow1);
+    _driver.pollForJobState(flow1.getName(), TaskUtil.getNamespacedJobName(flow1.getName(),
jobName1),
+        TaskState.IN_PROGRESS);
+    // Wait for tasks to be picked up
+    Thread.sleep(1500);
+    Assert.assertEquals(countRunningPartition(flow1, jobName1), _numNodes * perNodeTaskLimitation);
+
+    // schedule job2
+    String jobName2 = "PriorityJob2";
+    Workflow flow2 = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName2, jobConfig).build();
+    _driver.start(flow2);
+    _driver.pollForJobState(flow2.getName(), TaskUtil.getNamespacedJobName(flow2.getName(),
jobName2),
+        TaskState.IN_PROGRESS);
+    // Wait for tasks to be picked up
+    Thread.sleep(1500);
+    Assert.assertEquals(countRunningPartition(flow2, jobName2), 0);
+
+    // Increasing participants capacity
+    perNodeTaskLimitation = 2 * perNodeTaskLimitation;
+    setParticipantsCapacity(perNodeTaskLimitation);
+
+    Thread.sleep(1500);
+    // Additional capacity should all be used by job1
+    Assert.assertEquals(countRunningPartition(flow1, jobName1), _numNodes * perNodeTaskLimitation);
+    Assert.assertEquals(countRunningPartition(flow2, jobName2), 0);
+
+    _driver.stop(flow1.getName());
+    _driver.pollForWorkflowState(flow1.getName(), TaskState.STOPPED);
+    _driver.stop(flow2.getName());
+    _driver.pollForWorkflowState(flow2.getName(), TaskState.STOPPED);
+  }
+
+  private int countRunningPartition(Workflow flow, String jobName) {
+    int runningPartition = 0;
+    JobContext jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(flow.getName(),
jobName));
+    for (int partition : jobContext.getPartitionSet()) {
+      if (jobContext.getPartitionState(partition) != null && jobContext.getPartitionState(partition)
+          .equals(TaskPartitionState.RUNNING)) {
+        runningPartition++;
+      }
+    }
+    return runningPartition;
+  }
+
+  private JobConfig.Builder generateLongRunJobConfig(int numTasks) {
+    JobConfig.Builder jobConfig = new JobConfig.Builder();
+    List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+    for (int j = 0; j < numTasks; j++) {
+      taskConfigs.add(new TaskConfig.Builder().setTaskId("task_" + j).setCommand(MockTask.TASK_COMMAND).build());
+    }
+    jobConfig.addTaskConfigs(taskConfigs)
+        .setNumConcurrentTasksPerInstance(numTasks)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "120000"));
+    return jobConfig;
+  }
+
+  private void setParticipantsCapacity(int perNodeTaskLimitation) {
+    for (int i = 0; i < _numNodes; i++) {
+      InstanceConfig instanceConfig = _setupTool.getClusterManagementTool()
+          .getInstanceConfig(CLUSTER_NAME, PARTICIPANT_PREFIX + "_" + (_startPort + i));
+      instanceConfig.setMaxConcurrentTask(perNodeTaskLimitation);
+      _setupTool.getClusterManagementTool()
+          .setInstanceConfig(CLUSTER_NAME, PARTICIPANT_PREFIX + "_" + (_startPort + i), instanceConfig);
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message