helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/3] [HELIX-353] Write an independent task rebalancer
Date Tue, 20 May 2014 20:07:19 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 4aa54eb42 -> f1df10587


http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
new file mode 100644
index 0000000..b2928e6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -0,0 +1,170 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+  private Set<String> _invokedClasses = Sets.newHashSet();
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set task callbacks
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("TaskOne", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new TaskOne(context);
+      }
+    });
+    taskFactoryReg.put("TaskTwo", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new TaskTwo(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+  }
+
+  @BeforeMethod
+  public void beforeMethod() {
+    _invokedClasses.clear();
+  }
+
+  @Test
+  public void testDifferentTasks() throws Exception {
+    // Create a job with two different tasks
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", null);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
+    taskConfigs.add(taskConfig1);
+    taskConfigs.add(taskConfig2);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    jobConfigMap.put("Timeout", "1000");
+    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    _driver.start(workflowBuilder.build());
+
+    // Ensure the job completes
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure that each class was invoked
+    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+    Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
+  }
+
+  private class TaskOne extends ReindexTask {
+    public TaskOne(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public TaskResult run() {
+      _invokedClasses.add(getClass().getName());
+      return super.run();
+    }
+  }
+
+  private class TaskTwo extends TaskOne {
+    public TaskTwo(TaskCallbackContext context) {
+      super(context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 4839a9a..208480c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,10 +33,11 @@ import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
-import org.apache.helix.task.TaskContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskPartitionState;
@@ -54,11 +56,13 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class TestTaskRebalancer extends ZkIntegrationTestBase {
   private static final int n = 5;
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -90,8 +94,8 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config) {
-        return new ReindexTask(config);
+      public Task createNewTask(TaskCallbackContext context) {
+        return new ReindexTask(context);
       }
     });
 
@@ -150,29 +154,30 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
   @Test
   public void testExpiry() throws Exception {
-    String taskName = "Expiry";
+    String jobName = "Expiry";
     long expiry = 1000;
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
         WorkflowGenerator
-            .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
-                TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
+            .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig)
+            .setExpiry(expiry).build();
 
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
 
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName);
+    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(jobName);
     String workflowPropStoreKey =
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobName);
 
     // Ensure context and config exist
     Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
         AccessOption.PERSISTENT));
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
-    // Wait for task to finish and expire
-    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.COMPLETED);
+    // Wait for job to finish and expire
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
     _driver.invokeRebalance();
     Thread.sleep(expiry);
@@ -183,25 +188,26 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertEquals(accessor.getProperty(workflowCfgKey), null);
   }
 
-  private void basic(long taskCompletionTime) throws Exception {
+  private void basic(long jobCompletionTime) throws Exception {
     // We use a different resource name in each test method as a work around for a helix
participant
     // bug where it does
     // not clear locally cached state when a resource partition is dropped. Once that is
fixed we
     // should change these
     // tests to use the same resource name and implement a beforeMethod that deletes the
task
     // resource.
-    final String taskResource = "basic" + taskCompletionTime;
+    final String jobResource = "basic" + jobCompletionTime;
+    Map<String, String> commandConfig =
+        ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            commandConfig).build();
     _driver.start(flow);
 
-    // Wait for task completion
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+    // Wait for job completion
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
-    TaskContext ctx =
-        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
@@ -210,29 +216,31 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
   @Test
   public void partitionSet() throws Exception {
-    final String taskResource = "partitionSet";
-    ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13);
+    final String jobResource = "partitionSet";
+    ImmutableList<String> targetPartitions =
+        ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
 
     // construct and submit our basic workflow
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
-            String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions))
-            .build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1),
+            JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
     _driver.start(flow);
 
-    // wait for task completeness/timeout
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+    // wait for job completeness/timeout
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // see if resulting context completed successfully for our partition set
-    String namespacedName = TaskUtil.getNamespacedTaskName(taskResource);
+    String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
 
-    TaskContext ctx = TaskUtil.getTaskContext(_manager, namespacedName);
-    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, taskResource);
+    JobContext ctx = TaskUtil.getJobContext(_manager, namespacedName);
+    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, jobResource);
     Assert.assertNotNull(ctx);
     Assert.assertNotNull(workflowContext);
-    Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED);
-    for (int i : targetPartitions) {
+    Assert.assertEquals(workflowContext.getJobState(namespacedName), TaskState.COMPLETED);
+    for (String pName : targetPartitions) {
+      int i = ctx.getPartitionsByTarget().get(pName).get(0);
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
     }
@@ -242,33 +250,32 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
   public void testRepeatedWorkflow() throws Exception {
     String workflowName = "SomeWorkflow";
     Workflow flow =
-        WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
+        WorkflowGenerator.generateDefaultRepeatedJobWorkflowBuilder(workflowName).build();
     new TaskDriver(_manager).start(flow);
 
-    // Wait until the task completes
+    // Wait until the workflow completes
     TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
-    for (String task : flow.getTaskConfigs().keySet()) {
-      TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
+    for (String task : flow.getJobConfigs().keySet()) {
+      TestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
     }
   }
 
   @Test
   public void timeouts() throws Exception {
-    final String taskResource = "timeouts";
+    final String jobResource = "timeouts";
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2),
-            TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
+            String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build();
     _driver.start(flow);
 
-    // Wait until the task reports failure.
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED);
+    // Wait until the job reports failure.
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
-    TaskContext ctx =
-        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     int maxAttempts = 0;
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
@@ -284,8 +291,13 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg) {
-      _delay = Long.parseLong(cfg);
+    public ReindexTask(TaskCallbackContext context) {
+      JobConfig jobCfg = context.getJobConfig();
+      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      if (cfg == null) {
+        cfg = Collections.emptyMap();
+      }
+      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG))
: 200L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index cd260ee..97b8c7e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,8 +30,9 @@ import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
@@ -45,13 +47,16 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
   private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
   private static final int n = 5;
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
   private static final String TGT_DB = "TestDB";
-  private static final String TASK_RESOURCE = "SomeTask";
+  private static final String JOB_RESOURCE = "SomeJob";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -82,8 +87,8 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config) {
-        return new ReindexTask(config);
+      public Task createNewTask(TaskCallbackContext context) {
+        return new ReindexTask(context);
       }
     });
 
@@ -136,27 +141,28 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
 
   @Test
   public void stopAndResume() throws Exception {
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE,
+            commandConfig).build();
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
 
-    LOG.info("Pausing task");
-    _driver.stop(TASK_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.STOPPED);
+    LOG.info("Pausing job");
+    _driver.stop(JOB_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
 
-    LOG.info("Resuming task");
-    _driver.resume(TASK_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.COMPLETED);
+    LOG.info("Resuming job");
+    _driver.resume(JOB_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
   }
 
   @Test
   public void stopAndResumeWorkflow() throws Exception {
     String workflow = "SomeWorkflow";
-    Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build();
+    Workflow flow = WorkflowGenerator.generateDefaultRepeatedJobWorkflowBuilder(workflow).build();
 
     LOG.info("Starting flow " + workflow);
     _driver.start(flow);
@@ -175,8 +181,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg) {
-      _delay = Long.parseLong(cfg);
+    public ReindexTask(TaskCallbackContext context) {
+      JobConfig jobCfg = context.getJobConfig();
+      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      if (cfg == null) {
+        cfg = Collections.emptyMap();
+      }
+      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG))
: 200L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 470d59d..520d7c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -30,7 +30,7 @@ import org.testng.Assert;
  */
 public class TestUtil {
   /**
-   * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout
is
+   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout
is
    * reached.
    * If the task has not reached target state by then, an error is thrown
    * @param workflowResource Resource to poll for completeness
@@ -51,15 +51,15 @@ public class TestUtil {
     Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
-  public static void pollForTaskState(HelixManager manager, String workflowResource,
-      String taskName, TaskState state) throws InterruptedException {
+  public static void pollForJobState(HelixManager manager, String workflowResource,
+      String jobName, TaskState state) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
     do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName)
!= state)
+    } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName)
!= state)
         && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
 
     Assert.assertNotNull(ctx);

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 478e944..921a5f9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -1,66 +1,113 @@
 package org.apache.helix.integration.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Workflow;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Convenience class for generating various test workflows
  */
 public class WorkflowGenerator {
+  private static final Logger LOG = Logger.getLogger(WorkflowGenerator.class);
+
   public static final String DEFAULT_TGT_DB = "TestDB";
-  private static final String TASK_NAME_1 = "SomeTask1";
-  private static final String TASK_NAME_2 = "SomeTask2";
+  public static final String JOB_NAME_1 = "SomeJob1";
+  public static final String JOB_NAME_2 = "SomeJob2";
 
-  private static final Map<String, String> DEFAULT_TASK_CONFIG;
+  public static final Map<String, String> DEFAULT_JOB_CONFIG;
   static {
     Map<String, String> tmpMap = new TreeMap<String, String>();
     tmpMap.put("TargetResource", DEFAULT_TGT_DB);
     tmpMap.put("TargetPartitionStates", "MASTER");
     tmpMap.put("Command", "Reindex");
-    tmpMap.put("CommandConfig", String.valueOf(2000));
     tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
-    DEFAULT_TASK_CONFIG = Collections.unmodifiableMap(tmpMap);
+    DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
+  }
+
+  public static final Map<String, String> DEFAULT_COMMAND_CONFIG;
+  static {
+    Map<String, String> tmpMap = new TreeMap<String, String>();
+    tmpMap.put("Timeout", String.valueOf(2000));
+    DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
   }
 
-  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(
-      String taskName, String... cfgs) {
+  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(
+      String jobName, Map<String, String> commandConfig, String... cfgs) {
     if (cfgs.length % 2 != 0) {
       throw new IllegalArgumentException(
           "Additional configs should have even number of keys and values");
     }
-    Workflow.Builder bldr = generateDefaultSingleTaskWorkflowBuilder(taskName);
+    Workflow.Builder bldr = generateDefaultSingleJobWorkflowBuilder(jobName);
     for (int i = 0; i < cfgs.length; i += 2) {
-      bldr.addConfig(taskName, cfgs[i], cfgs[i + 1]);
+      bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
     }
 
     return bldr;
   }
 
-  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilder(String taskName)
{
-    return generateSingleTaskWorkflowBuilder(taskName, DEFAULT_TASK_CONFIG);
+  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName)
{
+    return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG);
   }
 
-  public static Workflow.Builder generateSingleTaskWorkflowBuilder(String taskName,
-      Map<String, String> config) {
-    Workflow.Builder builder = new Workflow.Builder(taskName);
+  public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
+      Map<String, String> commandConfig, Map<String, String> config) {
+    Workflow.Builder builder = new Workflow.Builder(jobName);
     for (String key : config.keySet()) {
-      builder.addConfig(taskName, key, config.get(key));
+      builder.addConfig(jobName, key, config.get(key));
+    }
+    if (commandConfig != null) {
+      ObjectMapper mapper = new ObjectMapper();
+      try {
+        String serializedMap = mapper.writeValueAsString(commandConfig);
+        builder.addConfig(jobName, JobConfig.JOB_CONFIG_MAP, serializedMap);
+      } catch (IOException e) {
+        LOG.error("Error serializing " + commandConfig, e);
+      }
     }
     return builder;
   }
 
-  public static Workflow.Builder generateDefaultRepeatedTaskWorkflowBuilder(String workflowName)
{
+  public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName)
{
     Workflow.Builder builder = new Workflow.Builder(workflowName);
-    builder.addParentChildDependency(TASK_NAME_1, TASK_NAME_2);
+    builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2);
 
-    for (String key : DEFAULT_TASK_CONFIG.keySet()) {
-      builder.addConfig(TASK_NAME_1, key, DEFAULT_TASK_CONFIG.get(key));
-      builder.addConfig(TASK_NAME_2, key, DEFAULT_TASK_CONFIG.get(key));
+    for (String key : DEFAULT_JOB_CONFIG.keySet()) {
+      builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key));
+      builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key));
+    }
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
+      builder.addConfig(JOB_NAME_1, JobConfig.JOB_CONFIG_MAP, serializedMap);
+      builder.addConfig(JOB_NAME_2, JobConfig.JOB_CONFIG_MAP, serializedMap);
+    } catch (IOException e) {
+      LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
     }
-
     return builder;
   }
 }


Mime
View raw message