helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject helix git commit: [HELIX-583] support deleting recurring job queue
Date Fri, 20 Mar 2015 08:52:53 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 49ceac0e9 -> a80ba8ebd


[HELIX-583] support deleting recurring job queue


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a80ba8eb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a80ba8eb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a80ba8eb

Branch: refs/heads/helix-0.6.x
Commit: a80ba8ebd9d7b2a7aded4339867c9f53a36fdfe8
Parents: 49ceac0
Author: zzhang <zzhang@apache.org>
Authored: Fri Mar 20 01:52:41 2015 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Fri Mar 20 01:52:41 2015 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/TaskRebalancer.java   | 36 ++++----
 .../java/org/apache/helix/task/TaskUtil.java    |  6 +-
 .../org/apache/helix/task/WorkflowConfig.java   | 10 +++
 .../task/TestTaskRebalancerStopResume.java      | 87 +++++++++++++++++++-
 .../apache/helix/integration/task/TestUtil.java | 24 +++++-
 5 files changed, 140 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 1c7a7a3..fe3f496 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -559,6 +559,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     if (SCHEDULED_TIMES.containsKey(id) || SCHEDULED_TIMES.inverse().containsKey(startTime))
{
       return;
     }
+    LOG.info("Schedule rebalance with id: " + id + "and job: " + jobResource);
 
     // For workflows not yet scheduled, schedule them and record it
     RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
@@ -664,6 +665,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
    */
   private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig
cfg,
       String workflowResource) {
+    LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource);
     HelixDataAccessor accessor = mgr.getHelixDataAccessor();
 
     // Remove any DAG references in workflow
@@ -684,7 +686,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
         try {
           currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
         } catch (Exception e) {
-          LOG.equals("Could not update DAG for job " + resourceName);
+          LOG.equals("Could not update DAG for job: " + resourceName);
         }
         return currentData;
       }
@@ -695,28 +697,31 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     // Delete resource configs.
     PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
     if (!accessor.removeProperty(cfgKey)) {
-      throw new RuntimeException(
-          String
-              .format(
-                  "Error occurred while trying to clean up job %s. Failed to remove node
%s from Helix. Aborting further clean up steps.",
-                  resourceName, cfgKey));
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up job %s. Failed to remove node %s from
Helix. Aborting further clean up steps.",
+          resourceName,
+          cfgKey));
     }
+
     // Delete property store information for this resource.
+    // For recurring workflow, it's OK if the node doesn't exist.
     String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
-      throw new RuntimeException(
-          String
-              .format(
-                  "Error occurred while trying to clean up job %s. Failed to remove node
%s from Helix. Aborting further clean up steps.",
-                  resourceName, propStoreKey));
-    }
-    // Finally, delete the ideal state itself.
+    mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
+
+    // Delete the ideal state itself.
     PropertyKey isKey = getISPropertyKey(accessor, resourceName);
     if (!accessor.removeProperty(isKey)) {
       throw new RuntimeException(String.format(
           "Error occurred while trying to clean up task %s. Failed to remove node %s from
Helix.",
           resourceName, isKey));
     }
+
+    // Delete dead external view
+    // because job is already completed, there is no more current state change
+    // thus dead external views removal will not be triggered
+    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
+    accessor.removeProperty(evKey);
+
     LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
 
     boolean lastInWorkflow = true;
@@ -727,11 +732,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
           || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
           || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
         lastInWorkflow = false;
+        break;
       }
     }
 
     // clean up workflow-level info if this was the last in workflow
-    if (lastInWorkflow && cfg.isTerminable()) {
+    if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE))
{
       // delete workflow config
       PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
       if (!accessor.removeProperty(workflowCfgKey)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 4f6afe0..a37dd6f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -345,7 +345,11 @@ public class TaskUtil {
   public static void invokeRebalance(HelixManager manager, String resource) {
     // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline
run
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource));
+    PropertyKey key = accessor.keyBuilder().idealStates(resource);
+    IdealState is = accessor.getProperty(key);
+    if (is != null) {
+      accessor.updateProperty(key, is);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 6bc5181..4129fea 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Provides a typed interface to workflow level configurations. Validates the configurations.
@@ -82,6 +83,10 @@ public class WorkflowConfig {
     return _scheduleConfig;
   }
 
+  public boolean isRecurring() {
+    return _scheduleConfig != null && _scheduleConfig.isRecurring();
+  }
+
   public Map<String, String> getResourceConfigMap() throws Exception {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
@@ -124,6 +129,11 @@ public class WorkflowConfig {
       return this;
     }
 
+    public Builder setExpiry(long v, TimeUnit unit) {
+      _expiry = unit.toMillis(v);
+      return this;
+    }
+
     public Builder setExpiry(long v) {
       _expiry = v;
       return this;

http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 9f72363..fd709d8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -29,22 +29,28 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.ScheduleConfig;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
@@ -56,6 +62,7 @@ import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.util.PathUtils;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -259,7 +266,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
 
 
   @Test
-  public void stopDeleteAndResumeNamedQueue() throws Exception {
+  public void stopDeleteJobAndResumeNamedQueue() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
     // Create a queue
@@ -307,8 +314,10 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, queueName,
-        String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+    TestUtil.pollForJobState(_manager,
+                             queueName,
+                             String.format("%s_%s", queueName, currentJobNames.get(1)),
+                             TaskState.STOPPED);
     TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
@@ -372,7 +381,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
   }
 
   @Test
-  public void stopDeleteAndResumeRecurrentNamedQueue() throws Exception {
+  public void stopDeleteJobAndResumeRecurrentQueue() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
     // Create a queue
@@ -459,6 +468,76 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
     // verify the job is not there for the next recurrence of queue schedule
   }
 
+  @Test
+  public void stopAndDeleteQueue() throws Exception {
+    final String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    System.out.println("START " + queueName + " at " + new Date(System.currentTimeMillis()));
+    WorkflowConfig wfCfg
+        = new WorkflowConfig.Builder().setExpiry(2, TimeUnit.MINUTES)
+                                      .setScheduleConfig(ScheduleConfig.recurringFromNow(TimeUnit.MINUTES,
1)).build();
+    JobQueue qCfg = new JobQueue.Builder(queueName).fromMap(wfCfg.getResourceConfigMap()).build();
+    _driver.createQueue(qCfg);
+
+    // Enqueue 2 jobs
+    Set<String> master = Sets.newHashSet("MASTER");
+    JobConfig.Builder job1 =
+        new JobConfig.Builder().setCommand("Reindex")
+                               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    String job1Name = "masterJob";
+    LOG.info("Enqueuing job1: " + job1Name);
+    _driver.enqueueJob(queueName, job1Name, job1);
+
+    Set<String> slave = Sets.newHashSet("SLAVE");
+    JobConfig.Builder job2 =
+        new JobConfig.Builder().setCommand("Reindex")
+                               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+    String job2Name = "slaveJob";
+    LOG.info("Enqueuing job2: " + job2Name);
+    _driver.enqueueJob(queueName, job2Name, job2);
+
+    String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+
+    String namespacedJob2 = String.format("%s_%s", queueName,  job2Name);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+
+    // Stop and delete queue
+    _driver.stop(queueName);
+    _driver.delete(queueName);
+
+    // Wait until all status are cleaned up
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() throws Exception {
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        // check paths for resource-config, ideal-state, external-view, property-store
+        List<String> paths
+            = Lists.newArrayList(keyBuilder.resourceConfigs().getPath(),
+                                 keyBuilder.idealStates().getPath(),
+                                 keyBuilder.externalViews().getPath(),
+                                 PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, CLUSTER_NAME)
+                                     + TaskConstants.REBALANCER_CONTEXT_ROOT);
+
+        for (String path : paths) {
+          List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path,
0);
+          for (String childName : childNames) {
+            if (childName.startsWith(queueName)) {
+              return false;
+            }
+          }
+        }
+
+        return true;
+      }
+    }, 30 * 1000);
+    Assert.assertTrue(result);
+
+    System.out.println("END " + queueName + " at " + new Date(System.currentTimeMillis()));
+  }
+
   private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();

http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 27e827a..43c5783 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -23,6 +23,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.testng.Assert;
 
@@ -56,13 +57,30 @@ public class TestUtil {
 
   public static void pollForJobState(HelixManager manager, String workflowResource, String
jobName,
       TaskState state) throws InterruptedException {
-    // Wait for completion.
-    long st = System.currentTimeMillis();
+    // Get workflow config
+    WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
+    Assert.assertNotNull(wfCfg);
     WorkflowContext ctx;
+    if (wfCfg.isRecurring()) {
+      // if it's recurring, need to reconstruct workflow and job name
+      do {
+        Thread.sleep(100);
+        ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+      } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
+      Assert.assertNotNull(ctx);
+      Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
+      jobName = jobName.substring(workflowResource.length() + 1);
+      workflowResource = ctx.getLastScheduledSingleWorkflow();
+      jobName = String.format("%s_%s", workflowResource, jobName);
+    }
+
+    // Wait for state
+    long st = System.currentTimeMillis();
     do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName)
!= state)
+    }
+    while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName) !=
state)
         && System.currentTimeMillis() < st + _default_timeout);
     Assert.assertNotNull(ctx);
     Assert.assertEquals(ctx.getJobState(jobName), state);


Mime
View raw message