helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-519] Add integration tests to ensure that kill-switch for Helix tasks work as expected, rb=26212
Date Wed, 01 Oct 2014 18:08:54 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 19ac664d0 -> 2ebfe7d19


[HELIX-519] Add integration tests to ensure that kill-switch for Helix tasks work as expected,
rb=26212


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2ebfe7d1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2ebfe7d1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2ebfe7d1

Branch: refs/heads/helix-0.6.x
Commit: 2ebfe7d1963b4fa9790375713208da62d3410ce7
Parents: 19ac664
Author: zzhang <zzhang@apache.org>
Authored: Wed Oct 1 11:08:39 2014 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Wed Oct 1 11:08:39 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 28 +++----
 .../task/TestTaskRebalancerStopResume.java      | 85 ++++++++++++++++++++
 .../apache/helix/integration/task/TestUtil.java | 21 ++++-
 3 files changed, 118 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2ebfe7d1/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 31d785b..a9a3ac0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -127,13 +127,13 @@ public class TaskDriver {
         }
         break;
       case stop:
-        driver.setTaskTargetState(resource, TargetState.STOP);
+        driver.setWorkflowTargetState(resource, TargetState.STOP);
         break;
       case resume:
-        driver.setTaskTargetState(resource, TargetState.START);
+        driver.setWorkflowTargetState(resource, TargetState.START);
         break;
       case delete:
-        driver.setTaskTargetState(resource, TargetState.DELETE);
+        driver.setWorkflowTargetState(resource, TargetState.DELETE);
         break;
       case list:
         driver.list(resource);
@@ -360,36 +360,36 @@ public class TaskDriver {
 
   /** Public method to resume a workflow/queue */
   public void resume(String workflow) {
-    setTaskTargetState(workflow, TargetState.START);
+    setWorkflowTargetState(workflow, TargetState.START);
   }
 
   /** Public method to stop a workflow/queue */
   public void stop(String workflow) {
-    setTaskTargetState(workflow, TargetState.STOP);
+    setWorkflowTargetState(workflow, TargetState.STOP);
   }
 
   /** Public method to delete a workflow/queue */
   public void delete(String workflow) {
-    setTaskTargetState(workflow, TargetState.DELETE);
+    setWorkflowTargetState(workflow, TargetState.DELETE);
   }
 
-  /** Helper function to change target state for a given task */
-  private void setTaskTargetState(String jobResource, TargetState state) {
-    setSingleTaskTargetState(jobResource, state);
+  /** Helper function to change target state for a given workflow */
+  private void setWorkflowTargetState(String workflowName, TargetState state) {
+    setSingleWorkflowTargetState(workflowName, state);
 
     // For recurring schedules, child workflows must also be handled
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     List<String> resources = accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+    String prefix = workflowName + "_" + TaskConstants.SCHEDULED;
     for (String resource : resources) {
-      String prefix = resource + "_" + TaskConstants.SCHEDULED;
       if (resource.startsWith(prefix)) {
-        setSingleTaskTargetState(resource, state);
+        setSingleWorkflowTargetState(resource, state);
       }
     }
   }
 
-  /** Helper function to change target state for a given task */
-  private void setSingleTaskTargetState(String jobResource, final TargetState state) {
+  /** Helper function to change target state for a given workflow */
+  private void setSingleWorkflowTargetState(String workflowName, final TargetState state)
{
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
@@ -405,7 +405,7 @@ public class TaskDriver {
     List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
     updaters.add(updater);
     List<String> paths = Lists.newArrayList();
-    paths.add(accessor.keyBuilder().resourceConfig(jobResource).getPath());
+    paths.add(accessor.keyBuilder().resourceConfig(workflowName).getPath());
     accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
     invokeRebalance();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/2ebfe7d1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 4e0d92a..2ec795a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -22,15 +22,23 @@ package org.apache.helix.integration.task;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
@@ -38,7 +46,9 @@ import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
@@ -48,6 +58,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 
 public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
   private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
@@ -177,6 +188,80 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
     TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
   }
 
+  @Test
+  public void stopAndResumeNamedQueue() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = new JobQueue.Builder(queueName).build();
+    _driver.createQueue(queue);
+
+    // Enqueue jobs
+    Set<String> master = Sets.newHashSet("MASTER");
+    JobConfig.Builder job1 =
+        new JobConfig.Builder().setCommand("Reindex")
+            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    String job1Name = "masterJob";
+    LOG.info("Enqueuing job: " + job1Name);
+    _driver.enqueueJob(queueName, job1Name, job1);
+
+    Set<String> slave = Sets.newHashSet("SLAVE");
+    JobConfig.Builder job2 =
+    new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+    String job2Name = "slaveJob";
+    LOG.info("Enqueuing job: " + job2Name);
+    _driver.enqueueJob(queueName, job2Name, job2);
+
+    String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
+
+    // stop job1
+    LOG.info("Pausing job-queue: " + queueName);
+    _driver.stop(queueName);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
+    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+
+    // Ensure job2 is not started
+    TimeUnit.MILLISECONDS.sleep(200);
+    String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
+    TestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
+
+    LOG.info("Resuming job-queue: " + queueName);
+    _driver.resume(queueName);
+
+    // Ensure successful completion
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
+    JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
+
+    // Ensure correct ordering
+    long job1Finish = masterJobContext.getFinishTime();
+    long job2Start = slaveJobContext.getStartTime();
+    Assert.assertTrue(job2Start >= job1Finish);
+
+    // Flush queue and check cleanup
+    LOG.info("Flusing job-queue: " + queueName);
+    _driver.flushQueue(queueName);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+    JobDag dag = workflowCfg.getJobDag();
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
+  }
+
+
   public static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;

http://git-wip-us.apache.org/repos/asf/helix/blob/2ebfe7d1/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index f599920..413b98a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.task;
  */
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowContext;
@@ -29,6 +30,8 @@ import org.testng.Assert;
  * Static test utility methods.
  */
 public class TestUtil {
+  private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
+
   /**
    * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout
is
    * reached.
@@ -45,7 +48,7 @@ public class TestUtil {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
     } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() !=
state)
-        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+        && System.currentTimeMillis() < st + _default_timeout);
 
     Assert.assertNotNull(ctx);
     Assert.assertEquals(ctx.getWorkflowState(), state);
@@ -60,8 +63,22 @@ public class TestUtil {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
     } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName)
!= state)
-        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+        && System.currentTimeMillis() < st + _default_timeout);
     Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getJobState(jobName), state);
   }
 
+  public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
+      final String jobName) throws Exception {
+    final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
+    boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
+        return ctx == null || ctx.getJobState(namespacedJobName) == null;
+      }
+    }, _default_timeout);
+    Assert.assertTrue(succeed);
+  }
 }


Mime
View raw message