helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: [HELIX-654] Running task rebalance
Date Fri, 14 Jul 2017 00:58:59 GMT
Repository: helix
Updated Branches:
  refs/heads/master 7a151cd30 -> 8cbbf834e


[HELIX-654] Running task rebalance

Add a job config RebalanceRunningTask.

For generic task, if feature is enabled, Helix will drop running
tasks that are assigned differently from the previous assignment,
which will cause cancellation of that running task on participant.
The task will then be re-assigned to a new instance.

For fix target task, running task always follows the partition, so
tasks are always re-assigned as needed.

Add different test cases for this feature enabled/disabled.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8cbbf834
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8cbbf834
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8cbbf834

Branch: refs/heads/master
Commit: 8cbbf834efa30b07c31067e1b48ac6332763b02e
Parents: 7a151cd
Author: Weihan Kong <wkong@linkedin.com>
Authored: Wed Apr 26 15:34:25 2017 -0700
Committer: Weihan Kong <wkong@linkedin.com>
Committed: Wed Jul 12 13:58:05 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   |  37 ++-
 .../org/apache/helix/task/JobRebalancer.java    |  68 +++-
 .../org/apache/helix/task/beans/JobBean.java    |   1 +
 .../apache/helix/integration/task/MockTask.java |   5 +
 .../helix/integration/task/TaskTestBase.java    |  14 +-
 .../helix/integration/task/TaskTestUtil.java    |  29 ++
 .../TestGenericTaskAssignmentCalculator.java    |   1 +
 .../task/TestIndependentTaskRebalancer.java     |  12 +-
 .../task/TestRebalanceRunningTask.java          | 320 +++++++++++++++++++
 .../integration/task/TestUserContentStore.java  |   1 +
 .../helix/task/TaskSynchronizedTestBase.java    |  52 ++-
 11 files changed, 483 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index b701623..12aa058 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -142,7 +142,12 @@ public class JobConfig extends ResourceConfig {
     /**
      * The expiration time for the job
      */
-    Expiry
+    Expiry,
+
+    /**
+     * Whether or not enable running task rebalance
+     */
+    RebalanceRunningTask
   }
 
   //Default property values
@@ -157,6 +162,7 @@ public class JobConfig extends ResourceConfig {
   public static final int DEFAULT_NUMBER_OF_TASKS = 0;
   public static final long DEFAULT_JOB_EXECUTION_START_TIME = -1L;
   public static final long DEFAULT_Job_EXECUTION_DELAY_TIME = -1L;
+  public static final boolean DEFAULT_REBALANCE_RUNNING_TASK = false;
 
   public JobConfig(HelixProperty property) {
     super(property.getRecord());
@@ -171,7 +177,8 @@ public class JobConfig extends ResourceConfig {
         jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(),
         jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(),
         jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), jobConfig.getExecutionDelay(),
-        jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry());
+        jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry(),
+        jobConfig.isRebalanceRunningTask());
   }
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
@@ -180,7 +187,8 @@ public class JobConfig extends ResourceConfig {
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
       boolean disableExternalView, boolean ignoreDependentJobFailure,
       Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
-      long executionDelay, long executionStart, String jobId, long expiry) {
+      long executionDelay, long executionStart, String jobId, long expiry,
+      boolean rebalanceRunningTask) {
     super(jobId);
     putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow);
     putSimpleConfig(JobConfigProperty.JobID.name(), jobId);
@@ -239,6 +247,8 @@ public class JobConfig extends ResourceConfig {
     }
     putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(),
         String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE));
+    getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
+        rebalanceRunningTask);
   }
 
   public String getWorkflow() {
@@ -354,6 +364,11 @@ public class JobConfig extends ResourceConfig {
     return getRecord().getLongField(JobConfigProperty.Expiry.name(), WorkflowConfig.DEFAULT_EXPIRY);
   }
 
+  public boolean isRebalanceRunningTask() {
+    return getRecord().getBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
+        DEFAULT_REBALANCE_RUNNING_TASK);
+  }
+
   public static JobConfig fromHelixProperty(HelixProperty property)
       throws IllegalArgumentException {
     Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -386,6 +401,7 @@ public class JobConfig extends ResourceConfig {
     private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
     private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
     private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
+    private boolean _rebalanceRunningTask = DEFAULT_REBALANCE_RUNNING_TASK;
 
     public JobConfig build() {
       if (_targetResource == null && _taskConfigMap.isEmpty()) {
@@ -404,7 +420,8 @@ public class JobConfig extends ResourceConfig {
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
           _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
-          _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry);
+          _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
+          _rebalanceRunningTask);
     }
 
     /**
@@ -480,6 +497,10 @@ public class JobConfig extends ResourceConfig {
       if (cfg.containsKey(JobConfigProperty.Expiry.name())) {
         b.setExpiry(Long.valueOf(cfg.get(JobConfigProperty.Expiry.name())));
       }
+      if (cfg.containsKey(JobConfigProperty.RebalanceRunningTask.name())) {
+        b.setRebalanceRunningTask(
+            Boolean.valueOf(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
+      }
       return b;
     }
 
@@ -604,6 +625,11 @@ public class JobConfig extends ResourceConfig {
       return this;
     }
 
+    public Builder setRebalanceRunningTask(boolean enabled) {
+      _rebalanceRunningTask = enabled;
+      return this;
+    }
+
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
@@ -675,7 +701,8 @@ public class JobConfig extends ResourceConfig {
           .setDisableExternalView(jobBean.disableExternalView)
           .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure)
           .setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay)
-          .setExecutionStart(jobBean.executionStart);
+          .setExecutionStart(jobBean.executionStart)
+          .setRebalanceRunningTask(jobBean.rebalanceRunningTask);
 
       if (jobBean.jobCommandConfigMap != null) {
         b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 612c89a..5f2bc57 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -266,6 +266,8 @@ public class JobRebalancer extends TaskRebalancer {
       Set<Integer> donePartitions = new TreeSet<Integer>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
+        TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
+            jobResource, pId, pName, instance, jobCtx);
 
         // Check for pending state transitions on this (partition, instance).
         Message pendingMessage =
@@ -288,17 +290,6 @@ public class JobRebalancer extends TaskRebalancer {
           continue;
         }
 
-        TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
-                pName), instance));
-        jobCtx.setPartitionState(pId, currState);
-
-        String taskMsg = currStateOutput.getInfo(jobResource, new Partition(
-            pName), instance);
-        if (taskMsg != null) {
-          jobCtx.setPartitionInfo(pId, taskMsg);
-        }
-
         // Process any requested state transitions.
         String requestedStateStr =
             currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
@@ -352,7 +343,7 @@ public class JobRebalancer extends TaskRebalancer {
           donePartitions.add(pId); // The task may be rescheduled on a different instance.
           LOG.debug(String.format(
               "Task partition %s has error state %s with msg %s. Marking as such in rebalancer
context.", pName,
-              currState, taskMsg));
+              currState, jobCtx.getPartitionInfo(pId)));
           markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition fails for a
specified
           // maximum number of attempts or task is in ABORTED state.
@@ -423,6 +414,11 @@ public class JobRebalancer extends TaskRebalancer {
       Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
           .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
               workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
+
+      if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) {
+        dropRebalancedRunningTasks(tgtPartitionAssignments, taskAssignments, paMap, jobCtx);
+      }
+
       for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet())
{
         String instance = entry.getKey();
         if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
@@ -478,6 +474,44 @@ public class JobRebalancer extends TaskRebalancer {
     return ra;
   }
 
+  /**
+   * If assignment is different from previous assignment, drop the old running task if it's
no
+   * longer assigned to the same instance, but not removing it from excludeSet because the
same task
+   * should not be assigned to the new instance right way.
+   */
+  private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment,
+      Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment>
paMap,
+      JobContext jobContext) {
+    for (String instance : oldAssignment.keySet()) {
+      for (Integer pId : oldAssignment.get(instance)) {
+        if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING
+            && !newAssignment.get(instance).contains(pId)) {
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
+          jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
+        }
+      }
+    }
+  }
+
+  private TaskPartitionState updateJobContextAndGetTaskCurrentState(
+      CurrentStateOutput currentStateOutput, String jobResource, Integer pId, String pName,
+      String instance, JobContext jobCtx) {
+    String currentStateString = currentStateOutput.getCurrentState(jobResource, new Partition(
+        pName), instance);
+    if (currentStateString == null) {
+      // Task state is either DROPPED or INIT
+      return jobCtx.getPartitionState(pId);
+    }
+    TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    jobCtx.setPartitionState(pId, currentState);
+    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(
+        pName), instance);
+    if (taskMsg != null) {
+      jobCtx.setPartitionInfo(pId, taskMsg);
+    }
+    return currentState;
+  }
+
   private void markJobComplete(String jobName, JobContext jobContext,
       WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
     long currentTime = System.currentTimeMillis();
@@ -684,12 +718,12 @@ public class JobRebalancer extends TaskRebalancer {
   }
 
   private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) {
+    return isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : _fixTaskAssignmentCal;
+  }
+
+  private boolean isGenericTaskJob(JobConfig jobConfig) {
     Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
-      return _genericTaskAssignmentCal;
-    } else {
-      return _fixTaskAssignmentCal;
-    }
+    return taskConfigMap != null && !taskConfigMap.isEmpty();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index b781a54..7b42ad2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -48,4 +48,5 @@ public class JobBean {
   public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW;
   public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
   public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS;
+  public boolean rebalanceRunningTask = JobConfig.DEFAULT_REBALANCE_RUNNING_TASK;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 0502f8e..dfe13ec 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -46,6 +46,8 @@ public class MockTask extends UserContentStore implements Task {
   private int _numOfSuccessBeforeFail;
   private String _errorMsg;
 
+  public static boolean _signalFail;
+
   public MockTask(TaskCallbackContext context) {
     Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap();
     if (cfg == null) {
@@ -87,6 +89,9 @@ public class MockTask extends UserContentStore implements Task {
         return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0
? 0
             : timeLeft));
       }
+      if (_signalFail) {
+        return new TaskResult(TaskResult.Status.FAILED, "Signaled to fail.");
+      }
       sleep(50);
     }
     timeLeft = expiry - System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
index 0d0f763..dd5dbfa 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.task;
  */
 
 import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.task.TaskSynchronizedTestBase;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
@@ -32,24 +33,13 @@ public class TaskTestBase extends TaskSynchronizedTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
-    _setupTool.addCluster(CLUSTER_NAME, true);
-    setupParticipants();
-    setupDBs();
-    startParticipants();
+    super.beforeClass();
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    createManagers();
-
     boolean result = ClusterStateVerifier.verifyByZkCallback(
         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index ce1d180..cdeebf4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -287,4 +287,33 @@ public class TaskTestUtil {
 
     return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
   }
+
+  /**
+   * Implement this class to periodically check whether a defined condition is true,
+   * if timeout, check the condition for the last time and return the result.
+   */
+  public static abstract class Poller {
+    private static final long DEFAULT_TIME_OUT = 1000*10;
+
+    public boolean poll() {
+      return poll(DEFAULT_TIME_OUT);
+    }
+
+    public boolean poll(long timeOut) {
+      long startTime = System.currentTimeMillis();
+      while (System.currentTimeMillis() < startTime + timeOut) {
+        if (check()) {
+          break;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+      return check();
+    }
+
+    public abstract boolean check();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
index f32afe8..cd6822a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -56,6 +56,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
+    _participants =  new MockParticipantManager[_numNodes];
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursive(namespace);

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 64b9073..b7a9beb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -19,17 +19,17 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
@@ -54,10 +54,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.testng.collections.Sets;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 public class TestIndependentTaskRebalancer extends TaskTestBase {
   private Set<String> _invokedClasses = Sets.newHashSet();
   private Map<String, Integer> _runCounts = Maps.newHashMap();
@@ -65,6 +61,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
+    _participants = new MockParticipantManager[_numNodes];
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursive(namespace);
@@ -177,7 +174,6 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
     _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
new file mode 100644
index 0000000..10d7cc4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
@@ -0,0 +1,320 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskSynchronizedTestBase;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase {
+
+  private ClusterControllerManager _controller;
+  private final String JOB = "test_job";
+  private String WORKFLOW;
+  private final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  private final int _initialNumNodes = 1;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 2;
+    _numParitions = 2;
+    _numReplicas = 1; // only Master, no Slave
+    _numDbs = 1;
+
+    _participants =  new MockParticipantManager[_numNodes];
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    setupParticipants();
+    setupDBs();
+
+    createManagers();
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, CONTROLLER_PREFIX);
+    _controller.syncStart();
+  }
+
+  @BeforeMethod
+  public void beforeMethod() throws InterruptedException {
+    startParticipants(_initialNumNodes);
+    Thread.sleep(1000);
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    stopParticipants();
+    MockTask._signalFail = false;
+  }
+
+  private boolean checkTasksOnDifferentInstances() {
+    return new TaskTestUtil.Poller() {
+      @Override
+      public boolean check() {
+        try {
+          return getNumOfInstances() > 1;
+        } catch (NullPointerException e) {
+          return false;
+        }
+      }
+    }.poll();
+  }
+
+  private boolean checkTasksOnSameInstances() {
+    return new TaskTestUtil.Poller() {
+      @Override
+      public boolean check() {
+        try {
+          return getNumOfInstances() == 1;
+        } catch (NullPointerException e) {
+          return false;
+        }
+      }
+    }.poll();
+  }
+
+  private int getNumOfInstances() {
+    JobContext jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW,
JOB));
+    Set<String> instances = new HashSet<String>();
+    for (int pId : jobContext.getPartitionSet()) {
+      instances.add(jobContext.getAssignedParticipant(pId));
+    }
+    return instances.size();
+  }
+
+  /**
+   * Task type: generic
+   * Rebalance raunning task: disabled
+   * Story: 1 node is down
+   */
+  @Test
+  public void testGenericTaskAndDisabledRebalanceAndNodeDown() throws InterruptedException
{
+    WORKFLOW = TestHelper.getTestMethodName();
+    startParticipant(_initialNumNodes);
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder()
+        .setWorkflow(WORKFLOW)
+        .setNumberOfTasks(10) // should be enough for consistent hashing to place tasks on
+                              // different instances
+        .setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); //
task stuck
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
+        .addJob(JOB, jobBuilder);
+
+    _driver.start(workflowBuilder.build());
+
+    Assert.assertTrue(checkTasksOnDifferentInstances());
+    // Stop a participant, tasks rebalanced to the same instance
+    stopParticipant(_initialNumNodes);
+    Assert.assertTrue(checkTasksOnSameInstances());
+  }
+
+  /**
+   * Task type: generic
+   * Rebalance raunning task: disabled
+   * Story: new node added, then current task fails
+   */
+  @Test
+  public void testGenericTaskAndDisabledRebalanceAndNodeAddedAndTaskFail() throws InterruptedException
{
+    WORKFLOW = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder()
+        .setWorkflow(WORKFLOW)
+        .setNumberOfTasks(10)
+        .setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setFailureThreshold(10)
+        .setMaxAttemptsPerTask(2)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); //
task stuck
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
+        .addJob(JOB, jobBuilder);
+
+    _driver.start(workflowBuilder.build());
+
+    // All tasks stuck on the same instance
+    Assert.assertTrue(checkTasksOnSameInstances());
+    // Add a new instance
+    startParticipant(_initialNumNodes);
+    Thread.sleep(3000);
+    // All tasks still stuck on the same instance, because RebalanceRunningTask is disabled
+    Assert.assertTrue(checkTasksOnSameInstances());
+    // Signal to fail all tasks
+    MockTask._signalFail = true;
+    // After fail, some task will be re-assigned to the new node.
+    // This doesn't require RebalanceRunningTask to be enabled
+    Assert.assertTrue(checkTasksOnDifferentInstances());
+  }
+
+  /**
+   * Task type: generic
+   * Rebalance raunning task: enabled
+   * Story: new node added
+   */
+  @Test
+  public void testGenericTaskAndEnabledRebalanceAndNodeAdded() throws InterruptedException
{
+    WORKFLOW = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder()
+        .setWorkflow(WORKFLOW)
+        .setNumberOfTasks(10)
+        .setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setRebalanceRunningTask(true)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); //
task stuck
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
+        .addJob(JOB, jobBuilder);
+
+    _driver.start(workflowBuilder.build());
+
+    // All tasks stuck on the same instance
+    Assert.assertTrue(checkTasksOnSameInstances());
+    // Add a new instance, and some running tasks will be rebalanced to the new node
+    startParticipant(_initialNumNodes);
+    Assert.assertTrue(checkTasksOnDifferentInstances());
+  }
+
+  /**
+   * Task type: fixed target
+   * Rebalance raunning task: disabled
+   * Story: 1 node is down
+   */
+  @Test
+  public void testFixedTargetTaskAndDisabledRebalanceAndNodeDown() throws InterruptedException
{
+    WORKFLOW = TestHelper.getTestMethodName();
+    startParticipant(_initialNumNodes);
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder()
+        .setWorkflow(WORKFLOW)
+        .setTargetResource(DATABASE)
+        .setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999"));
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
+        .addJob(JOB, jobBuilder);
+
+    _driver.start(workflowBuilder.build());
+
+    Assert.assertTrue(checkTasksOnDifferentInstances());
+    // Stop a participant and partitions will be moved to the same instance,
+    // and tasks rebalanced accordingly
+    stopParticipant(_initialNumNodes);
+    Assert.assertTrue(checkTasksOnSameInstances());
+  }
+
+  /**
+   * Task type: fixed target
+   * Rebalance raunning task: disabled
+   * Story: new node added
+   */
+  @Test
+  public void testFixedTargetTaskAndDisabledRebalanceAndNodeAdded() throws InterruptedException
{
+    WORKFLOW = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder()
+        .setWorkflow(WORKFLOW)
+        .setTargetResource(DATABASE)
+        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+        .setNumConcurrentTasksPerInstance(100)
+        .setFailureThreshold(2)
+        .setMaxAttemptsPerTask(2)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(
+            ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW).addJob(JOB, jobBuilder);
+
+    _driver.start(workflowBuilder.build());
+
+    // All tasks stuck on the same instance
+    Assert.assertTrue(checkTasksOnSameInstances());
+    // Add a new instance, partition is rebalanced
+    startParticipant(_initialNumNodes);
+    HelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
+            .setResources(Sets.newHashSet(DATABASE)).build();
+    Assert.assertTrue(clusterVerifier.verify(10*1000));
+    // Running tasks are also rebalanced, even though RebalanceRunningTask is disabled
+    Assert.assertTrue(checkTasksOnDifferentInstances());
+  }
+
+  /**
+   * Task type: fixed target
+   * Rebalance raunning task: enabled
+   * Story: new node added
+   */
+  @Test
+  public void testFixedTargetTaskAndEnabledRebalanceAndNodeAdded() throws InterruptedException
{
+    WORKFLOW = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder()
+        .setWorkflow(WORKFLOW)
+        .setTargetResource(DATABASE)
+        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+        .setNumConcurrentTasksPerInstance(100)
+        .setRebalanceRunningTask(true)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(
+            ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW).addJob(JOB, jobBuilder);
+
+    _driver.start(workflowBuilder.build());
+
+    // All tasks stuck on the same instance
+    Assert.assertTrue(checkTasksOnSameInstances());
+    // Add a new instance, partition is rebalanced
+    startParticipant(_initialNumNodes);
+    HelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
+            .setResources(Sets.newHashSet(DATABASE)).build();
+    Assert.assertTrue(clusterVerifier.verify(10*1000));
+    // Running tasks are also rebalanced
+    Assert.assertTrue(checkTasksOnDifferentInstances());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
index eb90a34..4750332 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
@@ -54,6 +54,7 @@ public class TestUserContentStore extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
+    _participants =  new MockParticipantManager[_numNodes];
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursive(namespace);

http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
index 9e51976..1004c1f 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -55,10 +56,11 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
 
   protected final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  protected final MockParticipantManager[] _participants = new MockParticipantManager[_numNodes];
+  protected MockParticipantManager[] _participants;
 
   @BeforeClass
   public void beforeClass() throws Exception {
+    _participants =  new MockParticipantManager[_numNodes];
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursive(namespace);
@@ -76,10 +78,7 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
   @AfterClass
   public void afterClass() throws Exception {
     _manager.disconnect();
-
-    for (int i = 0; i < _numNodes; i++) {
-      _participants[i].syncStop();
-    }
+    stopParticipants();
   }
 
   protected void setupDBs() {
@@ -103,7 +102,8 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
         idealState.setInstanceGroupTag("TESTTAG0");
         _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
idealState);
       } else {
-        _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
MASTER_SLAVE_STATE_MODEL);
+        _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
+            _numParitions, MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
       }
       _setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
_numReplicas);
     }
@@ -120,23 +120,45 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase
{
   }
 
   protected void startParticipants() {
+    startParticipants(_numNodes);
+  }
+
+  protected void startParticipants(int numNodes) {
+    for (int i = 0; i < numNodes; i++) {
+      startParticipant(i);
+    }
+  }
+
+  protected void startParticipant(int i) {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
       @Override public Task createNewTask(TaskCallbackContext context) {
         return new MockTask(context);
       }
     });
+    String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+    _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+    // Register a Task state model factory.
+    StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+    stateMachine.registerStateModelFactory("Task",
+        new TaskStateModelFactory(_participants[i], taskFactoryReg));
+    _participants[i].syncStart();
+  }
 
-    // start dummy participants
+  protected void stopParticipants() {
     for (int i = 0; i < _numNodes; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task",
-          new TaskStateModelFactory(_participants[i], taskFactoryReg));
-      _participants[i].syncStart();
+      stopParticipant(i);
+    }
+  }
+
+  protected void stopParticipant(int i) {
+    if (_participants.length <= i) {
+      throw new HelixException(String.format("Can't stop participant %s, only %s participants"
+          + "were set up.", i, _participants.length));
+    }
+    if (_participants[i] != null && _participants[i].isConnected()) {
+      _participants[i].reset();
     }
   }
 


Mime
View raw message