Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B68D2200CD3 for ; Fri, 14 Jul 2017 02:59:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B4F0D16CB2D; Fri, 14 Jul 2017 00:59:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0F10816CAED for ; Fri, 14 Jul 2017 02:58:59 +0200 (CEST) Received: (qmail 10668 invoked by uid 500); 14 Jul 2017 00:58:59 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 10659 invoked by uid 99); 14 Jul 2017 00:58:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Jul 2017 00:58:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20284E8E77; Fri, 14 Jul 2017 00:58:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jxue@apache.org To: commits@helix.apache.org Message-Id: <7dde9f455a674cc2a6231c18f65cc858@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: helix git commit: [HELIX-654] Running task rebalance Date: Fri, 14 Jul 2017 00:58:59 +0000 (UTC) archived-at: Fri, 14 Jul 2017 00:59:01 -0000 Repository: helix Updated Branches: refs/heads/master 7a151cd30 -> 8cbbf834e [HELIX-654] Running task rebalance Add a job config RebalanceRunningTask. For generic task, if feature is enabled, Helix will drop running tasks that are assigned differently from the previous assignment, which will cause cancellation of that running task on participant. The task will then be re-assigned to a new instance. For fix target task, running task always follows the partition, so tasks are always re-assigned as needed. Add different test cases for this feature enabled/disabled. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8cbbf834 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8cbbf834 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8cbbf834 Branch: refs/heads/master Commit: 8cbbf834efa30b07c31067e1b48ac6332763b02e Parents: 7a151cd Author: Weihan Kong Authored: Wed Apr 26 15:34:25 2017 -0700 Committer: Weihan Kong Committed: Wed Jul 12 13:58:05 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 37 ++- .../org/apache/helix/task/JobRebalancer.java | 68 +++- .../org/apache/helix/task/beans/JobBean.java | 1 + .../apache/helix/integration/task/MockTask.java | 5 + .../helix/integration/task/TaskTestBase.java | 14 +- .../helix/integration/task/TaskTestUtil.java | 29 ++ .../TestGenericTaskAssignmentCalculator.java | 1 + .../task/TestIndependentTaskRebalancer.java | 12 +- .../task/TestRebalanceRunningTask.java | 320 +++++++++++++++++++ .../integration/task/TestUserContentStore.java | 1 + .../helix/task/TaskSynchronizedTestBase.java | 52 ++- 11 files changed, 483 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index b701623..12aa058 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -142,7 +142,12 @@ public class JobConfig extends ResourceConfig { /** * The expiration time for the job */ - Expiry + Expiry, + + /** + * Whether or not enable running task rebalance + */ + RebalanceRunningTask } //Default property values @@ -157,6 +162,7 @@ public class JobConfig extends ResourceConfig { public static final int DEFAULT_NUMBER_OF_TASKS = 0; public static final long DEFAULT_JOB_EXECUTION_START_TIME = -1L; public static final long DEFAULT_Job_EXECUTION_DELAY_TIME = -1L; + public static final boolean DEFAULT_REBALANCE_RUNNING_TASK = false; public JobConfig(HelixProperty property) { super(property.getRecord()); @@ -171,7 +177,8 @@ public class JobConfig extends ResourceConfig { jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(), jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(), jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), jobConfig.getExecutionDelay(), - jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry()); + jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry(), + jobConfig.isRebalanceRunningTask()); } private JobConfig(String workflow, String targetResource, List targetPartitions, @@ -180,7 +187,8 @@ public class JobConfig extends ResourceConfig { int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay, boolean disableExternalView, boolean ignoreDependentJobFailure, Map taskConfigMap, String jobType, String instanceGroupTag, - long executionDelay, long executionStart, String jobId, long expiry) { + long executionDelay, long executionStart, String jobId, long expiry, + boolean rebalanceRunningTask) { super(jobId); putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow); putSimpleConfig(JobConfigProperty.JobID.name(), jobId); @@ -239,6 +247,8 @@ public class JobConfig extends ResourceConfig { } putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(), String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE)); + getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(), + rebalanceRunningTask); } public String getWorkflow() { @@ -354,6 +364,11 @@ public class JobConfig extends ResourceConfig { return getRecord().getLongField(JobConfigProperty.Expiry.name(), WorkflowConfig.DEFAULT_EXPIRY); } + public boolean isRebalanceRunningTask() { + return getRecord().getBooleanField(JobConfigProperty.RebalanceRunningTask.name(), + DEFAULT_REBALANCE_RUNNING_TASK); + } + public static JobConfig fromHelixProperty(HelixProperty property) throws IllegalArgumentException { Map configs = property.getRecord().getSimpleFields(); @@ -386,6 +401,7 @@ public class JobConfig extends ResourceConfig { private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW; private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS; + private boolean _rebalanceRunningTask = DEFAULT_REBALANCE_RUNNING_TASK; public JobConfig build() { if (_targetResource == null && _taskConfigMap.isEmpty()) { @@ -404,7 +420,8 @@ public class JobConfig extends ResourceConfig { _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay, _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType, - _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry); + _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry, + _rebalanceRunningTask); } /** @@ -480,6 +497,10 @@ public class JobConfig extends ResourceConfig { if (cfg.containsKey(JobConfigProperty.Expiry.name())) { b.setExpiry(Long.valueOf(cfg.get(JobConfigProperty.Expiry.name()))); } + if (cfg.containsKey(JobConfigProperty.RebalanceRunningTask.name())) { + b.setRebalanceRunningTask( + Boolean.valueOf(cfg.get(JobConfigProperty.RebalanceRunningTask.name()))); + } return b; } @@ -604,6 +625,11 @@ public class JobConfig extends ResourceConfig { return this; } + public Builder setRebalanceRunningTask(boolean enabled) { + _rebalanceRunningTask = enabled; + return this; + } + private void validate() { if (_taskConfigMap.isEmpty() && _targetResource == null) { throw new IllegalArgumentException( @@ -675,7 +701,8 @@ public class JobConfig extends ResourceConfig { .setDisableExternalView(jobBean.disableExternalView) .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure) .setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay) - .setExecutionStart(jobBean.executionStart); + .setExecutionStart(jobBean.executionStart) + .setRebalanceRunningTask(jobBean.rebalanceRunningTask); if (jobBean.jobCommandConfigMap != null) { b.setJobCommandConfigMap(jobBean.jobCommandConfigMap); http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 612c89a..5f2bc57 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -266,6 +266,8 @@ public class JobRebalancer extends TaskRebalancer { Set donePartitions = new TreeSet(); for (int pId : pSet) { final String pName = pName(jobResource, pId); + TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput, + jobResource, pId, pName, instance, jobCtx); // Check for pending state transitions on this (partition, instance). Message pendingMessage = @@ -288,17 +290,6 @@ public class JobRebalancer extends TaskRebalancer { continue; } - TaskPartitionState currState = - TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition( - pName), instance)); - jobCtx.setPartitionState(pId, currState); - - String taskMsg = currStateOutput.getInfo(jobResource, new Partition( - pName), instance); - if (taskMsg != null) { - jobCtx.setPartitionInfo(pId, taskMsg); - } - // Process any requested state transitions. String requestedStateStr = currStateOutput.getRequestedState(jobResource, new Partition(pName), instance); @@ -352,7 +343,7 @@ public class JobRebalancer extends TaskRebalancer { donePartitions.add(pId); // The task may be rescheduled on a different instance. LOG.debug(String.format( "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName, - currState, taskMsg)); + currState, jobCtx.getPartitionInfo(pId))); markPartitionError(jobCtx, pId, currState, true); // The error policy is to fail the task as soon a single partition fails for a specified // maximum number of attempts or task is in ABORTED state. @@ -423,6 +414,11 @@ public class JobRebalancer extends TaskRebalancer { Map> tgtPartitionAssignments = taskAssignmentCal .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); + + if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) { + dropRebalancedRunningTasks(tgtPartitionAssignments, taskAssignments, paMap, jobCtx); + } + for (Map.Entry> entry : taskAssignments.entrySet()) { String instance = entry.getKey(); if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances @@ -478,6 +474,44 @@ public class JobRebalancer extends TaskRebalancer { return ra; } + /** + * If assignment is different from previous assignment, drop the old running task if it's no + * longer assigned to the same instance, but not removing it from excludeSet because the same task + * should not be assigned to the new instance right way. + */ + private void dropRebalancedRunningTasks(Map> newAssignment, + Map> oldAssignment, Map paMap, + JobContext jobContext) { + for (String instance : oldAssignment.keySet()) { + for (Integer pId : oldAssignment.get(instance)) { + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING + && !newAssignment.get(instance).contains(pId)) { + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED); + } + } + } + } + + private TaskPartitionState updateJobContextAndGetTaskCurrentState( + CurrentStateOutput currentStateOutput, String jobResource, Integer pId, String pName, + String instance, JobContext jobCtx) { + String currentStateString = currentStateOutput.getCurrentState(jobResource, new Partition( + pName), instance); + if (currentStateString == null) { + // Task state is either DROPPED or INIT + return jobCtx.getPartitionState(pId); + } + TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString); + jobCtx.setPartitionState(pId, currentState); + String taskMsg = currentStateOutput.getInfo(jobResource, new Partition( + pName), instance); + if (taskMsg != null) { + jobCtx.setPartitionInfo(pId, taskMsg); + } + return currentState; + } + private void markJobComplete(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, WorkflowContext workflowContext) { long currentTime = System.currentTimeMillis(); @@ -684,12 +718,12 @@ public class JobRebalancer extends TaskRebalancer { } private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) { + return isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : _fixTaskAssignmentCal; + } + + private boolean isGenericTaskJob(JobConfig jobConfig) { Map taskConfigMap = jobConfig.getTaskConfigMap(); - if (taskConfigMap != null && !taskConfigMap.isEmpty()) { - return _genericTaskAssignmentCal; - } else { - return _fixTaskAssignmentCal; - } + return taskConfigMap != null && !taskConfigMap.isEmpty(); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index b781a54..7b42ad2 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -48,4 +48,5 @@ public class JobBean { public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW; public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS; + public boolean rebalanceRunningTask = JobConfig.DEFAULT_REBALANCE_RUNNING_TASK; } http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java index 0502f8e..dfe13ec 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java @@ -46,6 +46,8 @@ public class MockTask extends UserContentStore implements Task { private int _numOfSuccessBeforeFail; private String _errorMsg; + public static boolean _signalFail; + public MockTask(TaskCallbackContext context) { Map cfg = context.getJobConfig().getJobCommandConfigMap(); if (cfg == null) { @@ -87,6 +89,9 @@ public class MockTask extends UserContentStore implements Task { return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 : timeLeft)); } + if (_signalFail) { + return new TaskResult(TaskResult.Status.FAILED, "Signaled to fail."); + } sleep(50); } timeLeft = expiry - System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java index 0d0f763..dd5dbfa 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java @@ -20,6 +20,7 @@ package org.apache.helix.integration.task; */ import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.task.TaskSynchronizedTestBase; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; @@ -32,24 +33,13 @@ public class TaskTestBase extends TaskSynchronizedTestBase { @BeforeClass public void beforeClass() throws Exception { - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursive(namespace); - } - - _setupTool = new ClusterSetup(ZK_ADDR); - _setupTool.addCluster(CLUSTER_NAME, true); - setupParticipants(); - setupDBs(); - startParticipants(); + super.beforeClass(); // start controller String controllerName = CONTROLLER_PREFIX + "_0"; _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); _controller.syncStart(); - createManagers(); - boolean result = ClusterStateVerifier.verifyByZkCallback( new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index ce1d180..cdeebf4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -287,4 +287,33 @@ public class TaskTestUtil { return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); } + + /** + * Implement this class to periodically check whether a defined condition is true, + * if timeout, check the condition for the last time and return the result. + */ + public static abstract class Poller { + private static final long DEFAULT_TIME_OUT = 1000*10; + + public boolean poll() { + return poll(DEFAULT_TIME_OUT); + } + + public boolean poll(long timeOut) { + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() < startTime + timeOut) { + if (check()) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + return check(); + } + + public abstract boolean check(); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java index f32afe8..cd6822a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java @@ -56,6 +56,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase { @BeforeClass public void beforeClass() throws Exception { + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursive(namespace); http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 64b9073..b7a9beb 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -19,17 +19,17 @@ package org.apache.helix.integration.task; * under the License. */ +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; - -import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.TestHelper; -import org.apache.helix.integration.ZkIntegrationTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.participant.StateMachineEngine; @@ -54,10 +54,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import org.testng.collections.Sets; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - public class TestIndependentTaskRebalancer extends TaskTestBase { private Set _invokedClasses = Sets.newHashSet(); private Map _runCounts = Maps.newHashMap(); @@ -65,6 +61,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase { @BeforeClass public void beforeClass() throws Exception { + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursive(namespace); @@ -177,7 +174,6 @@ public class TestIndependentTaskRebalancer extends TaskTestBase { _driver.start(workflowBuilder.build()); // Ensure the job completes - _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS); _driver.pollForWorkflowState(jobName, TaskState.COMPLETED); // Ensure that each class was invoked http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java new file mode 100644 index 0000000..10d7cc4 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java @@ -0,0 +1,320 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskSynchronizedTestBase; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase { + + private ClusterControllerManager _controller; + private final String JOB = "test_job"; + private String WORKFLOW; + private final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB; + private final int _initialNumNodes = 1; + + @BeforeClass + public void beforeClass() throws Exception { + _numNodes = 2; + _numParitions = 2; + _numReplicas = 1; // only Master, no Slave + _numDbs = 1; + + _participants = new MockParticipantManager[_numNodes]; + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + _setupTool = new ClusterSetup(ZK_ADDR); + _setupTool.addCluster(CLUSTER_NAME, true); + setupParticipants(); + setupDBs(); + + createManagers(); + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, CONTROLLER_PREFIX); + _controller.syncStart(); + } + + @BeforeMethod + public void beforeMethod() throws InterruptedException { + startParticipants(_initialNumNodes); + Thread.sleep(1000); + } + + @AfterMethod + public void afterMethod() { + stopParticipants(); + MockTask._signalFail = false; + } + + private boolean checkTasksOnDifferentInstances() { + return new TaskTestUtil.Poller() { + @Override + public boolean check() { + try { + return getNumOfInstances() > 1; + } catch (NullPointerException e) { + return false; + } + } + }.poll(); + } + + private boolean checkTasksOnSameInstances() { + return new TaskTestUtil.Poller() { + @Override + public boolean check() { + try { + return getNumOfInstances() == 1; + } catch (NullPointerException e) { + return false; + } + } + }.poll(); + } + + private int getNumOfInstances() { + JobContext jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW, JOB)); + Set instances = new HashSet(); + for (int pId : jobContext.getPartitionSet()) { + instances.add(jobContext.getAssignedParticipant(pId)); + } + return instances.size(); + } + + /** + * Task type: generic + * Rebalance raunning task: disabled + * Story: 1 node is down + */ + @Test + public void testGenericTaskAndDisabledRebalanceAndNodeDown() throws InterruptedException { + WORKFLOW = TestHelper.getTestMethodName(); + startParticipant(_initialNumNodes); + + JobConfig.Builder jobBuilder = new JobConfig.Builder() + .setWorkflow(WORKFLOW) + .setNumberOfTasks(10) // should be enough for consistent hashing to place tasks on + // different instances + .setNumConcurrentTasksPerInstance(100) + .setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck + + Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW) + .addJob(JOB, jobBuilder); + + _driver.start(workflowBuilder.build()); + + Assert.assertTrue(checkTasksOnDifferentInstances()); + // Stop a participant, tasks rebalanced to the same instance + stopParticipant(_initialNumNodes); + Assert.assertTrue(checkTasksOnSameInstances()); + } + + /** + * Task type: generic + * Rebalance raunning task: disabled + * Story: new node added, then current task fails + */ + @Test + public void testGenericTaskAndDisabledRebalanceAndNodeAddedAndTaskFail() throws InterruptedException { + WORKFLOW = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = new JobConfig.Builder() + .setWorkflow(WORKFLOW) + .setNumberOfTasks(10) + .setNumConcurrentTasksPerInstance(100) + .setCommand(MockTask.TASK_COMMAND) + .setFailureThreshold(10) + .setMaxAttemptsPerTask(2) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck + + Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW) + .addJob(JOB, jobBuilder); + + _driver.start(workflowBuilder.build()); + + // All tasks stuck on the same instance + Assert.assertTrue(checkTasksOnSameInstances()); + // Add a new instance + startParticipant(_initialNumNodes); + Thread.sleep(3000); + // All tasks still stuck on the same instance, because RebalanceRunningTask is disabled + Assert.assertTrue(checkTasksOnSameInstances()); + // Signal to fail all tasks + MockTask._signalFail = true; + // After fail, some task will be re-assigned to the new node. + // This doesn't require RebalanceRunningTask to be enabled + Assert.assertTrue(checkTasksOnDifferentInstances()); + } + + /** + * Task type: generic + * Rebalance raunning task: enabled + * Story: new node added + */ + @Test + public void testGenericTaskAndEnabledRebalanceAndNodeAdded() throws InterruptedException { + WORKFLOW = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = new JobConfig.Builder() + .setWorkflow(WORKFLOW) + .setNumberOfTasks(10) + .setNumConcurrentTasksPerInstance(100) + .setCommand(MockTask.TASK_COMMAND) + .setRebalanceRunningTask(true) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck + + Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW) + .addJob(JOB, jobBuilder); + + _driver.start(workflowBuilder.build()); + + // All tasks stuck on the same instance + Assert.assertTrue(checkTasksOnSameInstances()); + // Add a new instance, and some running tasks will be rebalanced to the new node + startParticipant(_initialNumNodes); + Assert.assertTrue(checkTasksOnDifferentInstances()); + } + + /** + * Task type: fixed target + * Rebalance raunning task: disabled + * Story: 1 node is down + */ + @Test + public void testFixedTargetTaskAndDisabledRebalanceAndNodeDown() throws InterruptedException { + WORKFLOW = TestHelper.getTestMethodName(); + startParticipant(_initialNumNodes); + + JobConfig.Builder jobBuilder = new JobConfig.Builder() + .setWorkflow(WORKFLOW) + .setTargetResource(DATABASE) + .setNumConcurrentTasksPerInstance(100) + .setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); + + Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW) + .addJob(JOB, jobBuilder); + + _driver.start(workflowBuilder.build()); + + Assert.assertTrue(checkTasksOnDifferentInstances()); + // Stop a participant and partitions will be moved to the same instance, + // and tasks rebalanced accordingly + stopParticipant(_initialNumNodes); + Assert.assertTrue(checkTasksOnSameInstances()); + } + + /** + * Task type: fixed target + * Rebalance raunning task: disabled + * Story: new node added + */ + @Test + public void testFixedTargetTaskAndDisabledRebalanceAndNodeAdded() throws InterruptedException { + WORKFLOW = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = new JobConfig.Builder() + .setWorkflow(WORKFLOW) + .setTargetResource(DATABASE) + .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name())) + .setNumConcurrentTasksPerInstance(100) + .setFailureThreshold(2) + .setMaxAttemptsPerTask(2) + .setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap( + ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck + + Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW).addJob(JOB, jobBuilder); + + _driver.start(workflowBuilder.build()); + + // All tasks stuck on the same instance + Assert.assertTrue(checkTasksOnSameInstances()); + // Add a new instance, partition is rebalanced + startParticipant(_initialNumNodes); + HelixClusterVerifier clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient) + .setResources(Sets.newHashSet(DATABASE)).build(); + Assert.assertTrue(clusterVerifier.verify(10*1000)); + // Running tasks are also rebalanced, even though RebalanceRunningTask is disabled + Assert.assertTrue(checkTasksOnDifferentInstances()); + } + + /** + * Task type: fixed target + * Rebalance raunning task: enabled + * Story: new node added + */ + @Test + public void testFixedTargetTaskAndEnabledRebalanceAndNodeAdded() throws InterruptedException { + WORKFLOW = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = new JobConfig.Builder() + .setWorkflow(WORKFLOW) + .setTargetResource(DATABASE) + .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name())) + .setNumConcurrentTasksPerInstance(100) + .setRebalanceRunningTask(true) + .setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap( + ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck + + Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW).addJob(JOB, jobBuilder); + + _driver.start(workflowBuilder.build()); + + // All tasks stuck on the same instance + Assert.assertTrue(checkTasksOnSameInstances()); + // Add a new instance, partition is rebalanced + startParticipant(_initialNumNodes); + HelixClusterVerifier clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient) + .setResources(Sets.newHashSet(DATABASE)).build(); + Assert.assertTrue(clusterVerifier.verify(10*1000)); + // Running tasks are also rebalanced + Assert.assertTrue(checkTasksOnDifferentInstances()); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java index eb90a34..4750332 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java @@ -54,6 +54,7 @@ public class TestUserContentStore extends TaskTestBase { @BeforeClass public void beforeClass() throws Exception { + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursive(namespace); http://git-wip-us.apache.org/repos/asf/helix/blob/8cbbf834/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java index 9e51976..1004c1f 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -55,10 +56,11 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { protected final String MASTER_SLAVE_STATE_MODEL = "MasterSlave"; protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); - protected final MockParticipantManager[] _participants = new MockParticipantManager[_numNodes]; + protected MockParticipantManager[] _participants; @BeforeClass public void beforeClass() throws Exception { + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursive(namespace); @@ -76,10 +78,7 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { @AfterClass public void afterClass() throws Exception { _manager.disconnect(); - - for (int i = 0; i < _numNodes; i++) { - _participants[i].syncStop(); - } + stopParticipants(); } protected void setupDBs() { @@ -103,7 +102,8 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { idealState.setInstanceGroupTag("TESTTAG0"); _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState); } else { - _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions, MASTER_SLAVE_STATE_MODEL); + _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, + _numParitions, MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name()); } _setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numReplicas); } @@ -120,23 +120,45 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { } protected void startParticipants() { + startParticipants(_numNodes); + } + + protected void startParticipants(int numNodes) { + for (int i = 0; i < numNodes; i++) { + startParticipant(i); + } + } + + protected void startParticipant(int i) { Map taskFactoryReg = new HashMap(); taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() { @Override public Task createNewTask(TaskCallbackContext context) { return new MockTask(context); } }); + String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", + new TaskStateModelFactory(_participants[i], taskFactoryReg)); + _participants[i].syncStart(); + } - // start dummy participants + protected void stopParticipants() { for (int i = 0; i < _numNodes; i++) { - String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); - _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); - - // Register a Task state model factory. - StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); - stateMachine.registerStateModelFactory("Task", - new TaskStateModelFactory(_participants[i], taskFactoryReg)); - _participants[i].syncStart(); + stopParticipant(i); + } + } + + protected void stopParticipant(int i) { + if (_participants.length <= i) { + throw new HelixException(String.format("Can't stop participant %s, only %s participants" + + "were set up.", i, _participants.length)); + } + if (_participants[i] != null && _participants[i].isConnected()) { + _participants[i].reset(); } }