helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/3] [HELIX-336] Add support for task framework, rb=16071
Date Fri, 06 Dec 2013 23:25:41 GMT
Updated Branches:
  refs/heads/master 69de0f209 -> 80fc2be5e


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZNRecord.java b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
index 9ff4849..3976cd3 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
@@ -124,4 +124,26 @@ public class TestZNRecord {
     expectRecord.setMapField("mapKey2", expectMap2);
     Assert.assertEquals(record, expectRecord, "Should be equal.");
   }
+
+  @Test
+  public void testSubtract() {
+    ZNRecord record = new ZNRecord("test");
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("mapKey1", "mapValue1");
+    map.put("mapKey2", "mapValue2");
+    record.setMapField("key1", map);
+
+    ZNRecord delta = new ZNRecord("test");
+    Map<String, String> deltaMap = new HashMap<String, String>();
+    deltaMap.put("mapKey1", "mapValue1");
+    delta.setMapField("key1", deltaMap);
+
+    record.subtract(delta);
+
+    Assert.assertEquals(record.getMapFields().size(), 1);
+    Assert.assertNotNull(record.getMapField("key1"));
+    Assert.assertEquals(record.getMapField("key1").size(), 1);
+    Assert.assertNotNull(record.getMapField("key1").get("mapKey2"));
+    Assert.assertEquals(record.getMapField("key1").get("mapKey2"), "mapValue2");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 9188e61..25f049e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -77,8 +77,7 @@ public class ZkIntegrationTestBase {
   }
 
   protected String getShortClassName() {
-    String className = this.getClass().getName();
-    return className.substring(className.lastIndexOf('.') + 1);
+    return this.getClass().getSimpleName();
   }
 
   protected String getCurrentLeader(ZkClient zkClient, String clusterName) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
new file mode 100644
index 0000000..e9127a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -0,0 +1,306 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.helix.*;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.*;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskRebalancer extends ZkIntegrationTestBase {
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target db
+    setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+        MASTER_SLAVE_STATE_MODEL);
+    setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("Reindex", new TaskFactory() {
+      @Override
+      public Task createNewTask(String config) {
+        return new ReindexTask(config);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    // _controller = null;
+    for (int i = 0; i < n; i++) {
+      _participants[i].syncStop();
+      // _participants[i] = null;
+    }
+
+    _manager.disconnect();
+  }
+
+  @Test
+  public void basic() throws Exception {
+    basic(100);
+  }
+
+  @Test
+  public void zeroTaskCompletionTime() throws Exception {
+    basic(0);
+  }
+
+  @Test
+  public void testExpiry() throws Exception {
+    String taskName = "Expiry";
+    long expiry = 1000;
+    Workflow flow =
+        WorkflowGenerator
+            .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
+                TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
+
+    _driver.start(flow);
+    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS);
+
+    // Running workflow should have config and context viewable through accessor
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName);
+    String workflowPropStoreKey =
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
+
+    // Ensure context and config exist
+    Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+        AccessOption.PERSISTENT));
+    Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
+
+    // Wait for task to finish and expire
+    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.COMPLETED);
+    Thread.sleep(expiry);
+    _driver.invokeRebalance();
+    Thread.sleep(expiry);
+
+    // Ensure workflow config and context were cleaned up by now
+    Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+        AccessOption.PERSISTENT));
+    Assert.assertEquals(accessor.getProperty(workflowCfgKey), null);
+  }
+
+  private void basic(long taskCompletionTime) throws Exception {
+    // We use a different resource name in each test method as a work around for a helix
participant
+    // bug where it does
+    // not clear locally cached state when a resource partition is dropped. Once that is
fixed we
+    // should change these
+    // tests to use the same resource name and implement a beforeMethod that deletes the
task
+    // resource.
+    final String taskResource = "basic" + taskCompletionTime;
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+            TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build();
+    _driver.start(flow);
+
+    // Wait for task completion
+    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+
+    // Ensure all partitions are completed individually
+    TaskContext ctx =
+        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
+      Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
+    }
+  }
+
+  @Test
+  public void partitionSet() throws Exception {
+    final String taskResource = "partitionSet";
+    ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13);
+
+    // construct and submit our basic workflow
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+            TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
+            String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions))
+            .build();
+    _driver.start(flow);
+
+    // wait for task completeness/timeout
+    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+
+    // see if resulting context completed successfully for our partition set
+    String namespacedName = TaskUtil.getNamespacedTaskName(taskResource);
+
+    TaskContext ctx = TaskUtil.getTaskContext(_manager, namespacedName);
+    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, taskResource);
+    Assert.assertNotNull(ctx);
+    Assert.assertNotNull(workflowContext);
+    Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED);
+    for (int i : targetPartitions) {
+      Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
+      Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
+    }
+  }
+
+  @Test
+  public void testRepeatedWorkflow() throws Exception {
+    String workflowName = "SomeWorkflow";
+    Workflow flow =
+        WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
+    new TaskDriver(_manager).start(flow);
+
+    // Wait until the task completes
+    TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
+
+    // Assert completion for all tasks within two minutes
+    for (String task : flow.getTaskConfigs().keySet()) {
+      TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
+    }
+  }
+
+  @Test
+  public void timeouts() throws Exception {
+    final String taskResource = "timeouts";
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+            TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2),
+            TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build();
+    _driver.start(flow);
+
+    // Wait until the task reports failure.
+    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED);
+
+    // Check that all partitions timed out up to maxAttempts
+    TaskContext ctx =
+        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    int maxAttempts = 0;
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      TaskPartitionState state = ctx.getPartitionState(i);
+      if (state != null) {
+        Assert.assertEquals(state, TaskPartitionState.TIMED_OUT);
+        maxAttempts = Math.max(maxAttempts, ctx.getPartitionNumAttempts(i));
+      }
+    }
+    Assert.assertEquals(maxAttempts, 2);
+  }
+
+  private static class ReindexTask implements Task {
+    private final long _delay;
+    private volatile boolean _canceled;
+
+    public ReindexTask(String cfg) {
+      _delay = Long.parseLong(cfg);
+    }
+
+    @Override
+    public TaskResult run() {
+      long expiry = System.currentTimeMillis() + _delay;
+      long timeLeft;
+      while (System.currentTimeMillis() < expiry) {
+        if (_canceled) {
+          timeLeft = expiry - System.currentTimeMillis();
+          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft <
0 ? 0
+              : timeLeft));
+        }
+        sleep(50);
+      }
+      timeLeft = expiry - System.currentTimeMillis();
+      return new TaskResult(TaskResult.Status.COMPLETED,
+          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+    }
+
+    @Override
+    public void cancel() {
+      _canceled = true;
+    }
+
+    private static void sleep(long d) {
+      try {
+        Thread.sleep(d);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
new file mode 100644
index 0000000..01d64f3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -0,0 +1,209 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.*;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TGT_DB = "TestDB";
+  private static final String TASK_RESOURCE = "SomeTask";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target db
+    setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
+    setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("Reindex", new TaskFactory() {
+      @Override
+      public Task createNewTask(String config) {
+        return new ReindexTask(config);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+            ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      _participants[i].syncStop();
+    }
+    _manager.disconnect();
+  }
+
+  @Test
+  public void stopAndResume() throws Exception {
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
+            TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build();
+
+    LOG.info("Starting flow " + flow.getName());
+    _driver.start(flow);
+    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.IN_PROGRESS);
+
+    LOG.info("Pausing task");
+    _driver.stop(TASK_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.STOPPED);
+
+    LOG.info("Resuming task");
+    _driver.resume(TASK_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.COMPLETED);
+  }
+
+  @Test
+  public void stopAndResumeWorkflow() throws Exception {
+    String workflow = "SomeWorkflow";
+    Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build();
+
+    LOG.info("Starting flow " + workflow);
+    _driver.start(flow);
+    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
+
+    LOG.info("Pausing workflow");
+    _driver.stop(workflow);
+    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
+
+    LOG.info("Resuming workflow");
+    _driver.resume(workflow);
+    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
+  }
+
+  public static class ReindexTask implements Task {
+    private final long _delay;
+    private volatile boolean _canceled;
+
+    public ReindexTask(String cfg) {
+      _delay = Long.parseLong(cfg);
+    }
+
+    @Override
+    public TaskResult run() {
+      long expiry = System.currentTimeMillis() + _delay;
+      long timeLeft;
+      while (System.currentTimeMillis() < expiry) {
+        if (_canceled) {
+          timeLeft = expiry - System.currentTimeMillis();
+          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft <
0 ? 0
+              : timeLeft));
+        }
+        sleep(50);
+      }
+      timeLeft = expiry - System.currentTimeMillis();
+      return new TaskResult(TaskResult.Status.COMPLETED,
+          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+    }
+
+    @Override
+    public void cancel() {
+      _canceled = true;
+    }
+
+    private static void sleep(long d) {
+      try {
+        Thread.sleep(d);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
new file mode 100644
index 0000000..2cc6cb8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -0,0 +1,70 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.*;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+/**
+ * Static test utility methods.
+ */
+public class TestUtil {
+  private static final Logger LOG = Logger.getLogger(TestUtil.class);
+
+  /**
+   * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout
is
+   * reached.
+   * If the task has not reached target state by then, an error is thrown
+   * @param workflowResource Resource to poll for completeness
+   * @throws InterruptedException
+   */
+  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
+      TaskState state) throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() !=
state)
+        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+
+    Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getWorkflowState(), state);
+  }
+
+  public static void pollForTaskState(HelixManager manager, String workflowResource,
+      String taskName, TaskState state) throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName)
!= state)
+        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+
+    Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getWorkflowState(), state);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
new file mode 100644
index 0000000..0d7251a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -0,0 +1,85 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.task.Workflow;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Convenience class for generating various test workflows
+ */
+public class WorkflowGenerator {
+  public static final String DEFAULT_TGT_DB = "TestDB";
+  private static final String TASK_NAME_1 = "SomeTask1";
+  private static final String TASK_NAME_2 = "SomeTask2";
+
+  private static final Map<String, String> DEFAULT_TASK_CONFIG;
+  static {
+    Map<String, String> tmpMap = new TreeMap<String, String>();
+    tmpMap.put("TargetResource", DEFAULT_TGT_DB);
+    tmpMap.put("TargetPartitionStates", "MASTER");
+    tmpMap.put("Command", "Reindex");
+    tmpMap.put("CommandConfig", String.valueOf(2000));
+    tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
+    DEFAULT_TASK_CONFIG = Collections.unmodifiableMap(tmpMap);
+  }
+
+  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(
+      String taskName, String... cfgs) {
+    if (cfgs.length % 2 != 0) {
+      throw new IllegalArgumentException(
+          "Additional configs should have even number of keys and values");
+    }
+    Workflow.Builder bldr = generateDefaultSingleTaskWorkflowBuilder(taskName);
+    for (int i = 0; i < cfgs.length; i += 2) {
+      bldr.addConfig(taskName, cfgs[i], cfgs[i + 1]);
+    }
+
+    return bldr;
+  }
+
+  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilder(String taskName)
{
+    return generateSingleTaskWorkflowBuilder(taskName, DEFAULT_TASK_CONFIG);
+  }
+
+  public static Workflow.Builder generateSingleTaskWorkflowBuilder(String taskName,
+      Map<String, String> config) {
+    Workflow.Builder builder = new Workflow.Builder(taskName);
+    for (String key : config.keySet()) {
+      builder.addConfig(taskName, key, config.get(key));
+    }
+    return builder;
+  }
+
+  public static Workflow.Builder generateDefaultRepeatedTaskWorkflowBuilder(String workflowName)
{
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    builder.addParentChildDependency(TASK_NAME_1, TASK_NAME_2);
+
+    for (String key : DEFAULT_TASK_CONFIG.keySet()) {
+      builder.addConfig(TASK_NAME_1, key, DEFAULT_TASK_CONFIG.get(key));
+      builder.addConfig(TASK_NAME_2, key, DEFAULT_TASK_CONFIG.get(key));
+    }
+
+    return builder;
+  }
+}


Mime
View raw message