helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject helix git commit: Support deleting job from a job queue. The queue has to be stopped before deleting jobs.
Date Sat, 07 Mar 2015 18:28:37 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 03d25dd5b -> fd39189bb


Support deleting job from a job queue. The queue has to be stopped before deleting jobs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fd39189b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fd39189b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fd39189b

Branch: refs/heads/helix-0.6.x
Commit: fd39189bb9959c42ce8ce7934fdeec0c12ff3147
Parents: 03d25dd
Author: Lei Xia <lxia@linkedin.com>
Authored: Thu Mar 5 18:15:32 2015 -0800
Committer: Lei Xia <lxia@linkedin.com>
Committed: Thu Mar 5 18:15:32 2015 -0800

----------------------------------------------------------------------
 .../helix/webapp/resources/JobResource.java     |  29 +++++
 .../apache/helix/webapp/AdminTestHelper.java    |   7 ++
 .../webapp/resources/TestJobQueuesResource.java |  33 +++--
 .../main/java/org/apache/helix/task/JobDag.java |  30 +++++
 .../java/org/apache/helix/task/TaskDriver.java  |  99 +++++++++++++++
 .../task/TestTaskRebalancerStopResume.java      | 123 +++++++++++++++++--
 6 files changed, 302 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
index 0193c4c..cdcde35 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
@@ -30,9 +30,11 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskUtil;
 import org.apache.log4j.Logger;
 import org.restlet.data.MediaType;
+import org.restlet.data.Status;
 import org.restlet.representation.Representation;
 import org.restlet.representation.StringRepresentation;
 import org.restlet.representation.Variant;
@@ -78,6 +80,33 @@ public class JobResource extends ServerResource {
     return presentation;
   }
 
+
+  @Override
+  public Representation delete() {
+    StringRepresentation representation = null;
+
+    String clusterName = ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+    String jobQueueName = ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE);
+    String jobName = ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB);
+
+    ZkClient zkClient =
+        ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+
+    TaskDriver driver = new TaskDriver(zkClient, clusterName);
+
+    try {
+      driver.deleteJob(jobQueueName, jobName);
+      getResponse().setStatus(Status.SUCCESS_NO_CONTENT);
+    } catch (Exception e) {
+      String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+      representation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+      LOG.error("Fail to delete job: " + jobName, e);
+    }
+
+    return representation;
+  }
+
   StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName,
       String jobName) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
index 5b371cc..c07cc86 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
@@ -100,6 +100,13 @@ public class AdminTestHelper {
     return record;
   }
 
+  public static void delete(Client client, String url) throws IOException {
+    Reference resourceRef = new Reference(url);
+    Request request = new Request(Method.DELETE, resourceRef);
+    Response response = client.handle(request);
+    Assert.assertEquals(response.getStatus(), Status.SUCCESS_NO_CONTENT);
+  }
+
   public static ZNRecord post(Client client, String url, String body)
       throws IOException {
     Reference resourceRef = new Reference(url);

http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
index cc922ad..9c2306a 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
@@ -130,14 +130,22 @@ public class TestJobQueuesResource extends AdminTestBase {
 
     WorkflowBean wfBean = new WorkflowBean();
     wfBean.name = queueName;
-    JobBean jBean = new JobBean();
-    jBean.name = "myJob1";
-    jBean.command = "DummyTask";
-    jBean.targetResource = WorkflowGenerator.DEFAULT_TGT_DB;
-    jBean.targetPartitionStates = Lists.newArrayList("MASTER");
-    wfBean.jobs = Lists.newArrayList(jBean);
+
+    JobBean jBean1 = new JobBean();
+    jBean1.name = "myJob1";
+    jBean1.command = "DummyTask";
+    jBean1.targetResource = WorkflowGenerator.DEFAULT_TGT_DB;
+    jBean1.targetPartitionStates = Lists.newArrayList("MASTER");
+
+    JobBean jBean2 = new JobBean();
+    jBean2.name = "myJob2";
+    jBean2.command = "DummyTask";
+    jBean2.targetResource = WorkflowGenerator.DEFAULT_TGT_DB;
+    jBean2.targetPartitionStates = Lists.newArrayList("SLAVE");
+
+    wfBean.jobs = Lists.newArrayList(jBean1, jBean2);
     String jobYamlConfig = new Yaml().dump(wfBean);
-    LOG.info("Enqueuing a job: " + jobQueueYamlConfig);
+    LOG.info("Enqueuing jobs: " + jobQueueYamlConfig);
 
     Map<String, String> paraMap = new HashMap<String, String>();
     paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.start.toString());
@@ -152,7 +160,7 @@ public class TestJobQueuesResource extends AdminTestBase {
     // Get job
     resourceUrl =
         "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName
-            + "/" + jBean.name;
+            + "/" + jBean1.name;
     getRet = AdminTestHelper.get(_gClient, resourceUrl);
     LOG.info("Got job: " + getRet);
 
@@ -164,7 +172,16 @@ public class TestJobQueuesResource extends AdminTestBase {
     postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
     LOG.info("Stopped job-queue, ret: " + postRet);
 
+    // Delete a job
+    resourceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName
+            + "/" + jBean2.name;
+    AdminTestHelper.delete(_gClient, resourceUrl);
+    LOG.info("Delete a job: ");
+
     // Resume job queue
+    resourceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName;
     paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.resume.toString());
     postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap));
     postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);

http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 18a721e..3564f19 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -64,10 +64,40 @@ public class JobDag {
     _allNodes.add(child);
   }
 
+  public void removeParentToChild(String parent, String child) {
+    if (_parentsToChildren.containsKey(parent)) {
+      Set<String> children = _parentsToChildren.get(parent);
+      children.remove(child);
+      if (children.isEmpty()) {
+        _parentsToChildren.remove(parent);
+      }
+    }
+
+    if (_childrenToParents.containsKey(child)) {
+      Set<String> parents =  _childrenToParents.get(child);
+      parents.remove(parent);
+      if (parents.isEmpty()) {
+        _childrenToParents.remove(child);
+      }
+    }
+  }
+
   public void addNode(String node) {
     _allNodes.add(node);
   }
 
+  /**
+   * must make sure no other node dependence before removing the node
+   */
+  public void removeNode(String node) {
+    if (_parentsToChildren.containsKey(node) || _childrenToParents.containsKey(node)) {
+      throw new IllegalStateException(
+          "The node is either a parent or a child of other node, could not be deleted");
+    }
+
+    _allNodes.remove(node);
+  }
+
   public Map<String, Set<String>> getParentsToChildren() {
     return _parentsToChildren;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 60dc22c..87d81a1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -279,6 +279,105 @@ public class TaskDriver {
     _propertyStore.update(path, updater, AccessOption.PERSISTENT);
   }
 
+  /** Delete a job from an existing named queue, the queue has to be stopped prior to this
call */
+  public void deleteJob(final String queueName, final String jobName) {
+    HelixProperty workflowConfig = _accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName));
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+    }
+    boolean isTerminable =
+        workflowConfig.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
+    if (isTerminable) {
+      throw new IllegalArgumentException(queueName + " is not a queue!");
+    }
+
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
+    String workflowState =
+        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+
+    if (workflowState.equals(TaskState.IN_PROGRESS)) {
+      throw new IllegalStateException("Queue " + queueName + " is still in progress!");
+    }
+
+    // Remove the job from the queue in the DAG
+    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        // Add the node to the existing DAG
+        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        Set<String> allNodes = jobDag.getAllNodes();
+        if (!allNodes.contains(namespacedJobName)) {
+          throw new IllegalStateException("Could not delete job from queue " + queueName
+ ", job "
+              + jobName + " not exists");
+        }
+
+        String parent = null;
+        String child = null;
+        // remove the node from the queue
+        for (String node : allNodes) {
+          if (!node.equals(namespacedJobName)) {
+            if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
+              parent = node;
+              jobDag.removeParentToChild(parent, namespacedJobName);
+            } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
+              child = node;
+              jobDag.removeParentToChild(namespacedJobName, child);
+            }
+          }
+        }
+
+        if (parent != null && child != null) {
+          jobDag.addParentToChild(parent, child);
+        }
+
+        jobDag.removeNode(namespacedJobName);
+
+        // Save the updated DAG
+        try {
+          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+        } catch (Exception e) {
+          throw new IllegalStateException(
+              "Could not remove job " + jobName + " from queue " + queueName, e);
+        }
+        return currentData;
+      }
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      throw new IllegalArgumentException("Could not enqueue job");
+    }
+
+    // delete the ideal state and resource config for the job
+    _admin.dropResource(_clusterName, namespacedJobName);
+
+    // update queue's property to remove job from JOB_STATES if it is already started.
+    String queuePropertyPath =
+        Joiner.on("/")
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
+    updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
+          if (states != null && states.containsKey(namespacedJobName)) {
+            states.keySet().remove(namespacedJobName);
+          }
+        }
+        return currentData;
+      }
+    };
+    _propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT);
+
+    // Delete the job from property store
+    String jobPropertyPath =
+        Joiner.on("/")
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
+    _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
+  }
+
   /** Adds a new job to the end an existing named queue */
   public void enqueueJob(final String queueName, final String jobName, JobConfig.Builder
jobBuilder)
       throws Exception {

http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 2ec795a..500f029 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -19,8 +19,10 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -245,23 +247,122 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
     // Flush queue and check cleanup
     LOG.info("Flusing job-queue: " + queueName);
     _driver.flushQueue(queueName);
+
+    verifyJobDeleted(queueName, namespacedJob1);
+    verifyJobDeleted(queueName, namespacedJob2);
+    verifyJobNotInQueue(queueName, namespacedJob1);
+    verifyJobNotInQueue(queueName, namespacedJob2);
+  }
+
+  @Test
+  public void stopDeleteAndResumeNamedQueue() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = new JobQueue.Builder(queueName).build();
+    _driver.createQueue(queue);
+
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i <= 4; i++) {
+      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+      JobConfig.Builder job =
+          new JobConfig.Builder().setCommand("Reindex")
+              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      String jobName = targetPartition.toLowerCase() + "Job" + i;
+      LOG.info("Enqueuing job: " + jobName);
+      _driver.enqueueJob(queueName, jobName, job);
+      currentJobNames.add(i, jobName);
+    }
+
+    // ensure job 1 is started before deleting it
+    String deletedJob1 = currentJobNames.get(0);
+    String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1);
+    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+
+    // stop the queue
+    LOG.info("Pausing job-queue: " + queueName);
+    _driver.stop(queueName);
+    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+
+    // delete the in-progress job (job 1) and verify it being deleted
+    _driver.deleteJob(queueName, deletedJob1);
+    verifyJobDeleted(queueName, namedSpaceDeletedJob1);
+
+    LOG.info("Resuming job-queue: " + queueName);
+    _driver.resume(queueName);
+
+    // ensure job 2 is started
+    TestUtil.pollForJobState(_manager, queueName,
+        String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
+
+    // stop the queue
+    LOG.info("Pausing job-queue: " + queueName);
+    _driver.stop(queueName);
+    TestUtil.pollForJobState(_manager, queueName,
+        String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+
+    // Ensure job 3 is not started before deleting it
+    String deletedJob2 = currentJobNames.get(2);
+    String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2);
+    TestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
+
+    // delete not-started job (job 3) and verify it being deleted
+    _driver.deleteJob(queueName, deletedJob2);
+    verifyJobDeleted(queueName, namedSpaceDeletedJob2);
+
+    LOG.info("Resuming job-queue: " + queueName);
+    _driver.resume(queueName);
+
+    // Ensure the jobs left are successful completed in the correct order
+    currentJobNames.remove(deletedJob1);
+    currentJobNames.remove(deletedJob2);
+    long preJobFinish = 0;
+    for (int i = 0; i < currentJobNames.size(); i++) {
+      String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
+      TestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
+
+      JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
+      long jobStart = jobContext.getStartTime();
+      Assert.assertTrue(jobStart >= preJobFinish);
+      preJobFinish = jobContext.getFinishTime();
+    }
+
+    // Flush queue
+    LOG.info("Flusing job-queue: " + queueName);
+    _driver.flushQueue(queueName);
+
+    TimeUnit.MILLISECONDS.sleep(200);
+    // verify the cleanup
+    for (int i = 0; i < currentJobNames.size(); i++) {
+      String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
+      verifyJobDeleted(queueName, namedSpaceJobName);
+      verifyJobNotInQueue(queueName, namedSpaceJobName);
+    }
+  }
+
+  private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1)));
-    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
-    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
-    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
+
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
+    TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
+  }
+
+  private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) {
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
     JobDag dag = workflowCfg.getJobDag();
-    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
-    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));
-    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1));
-    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2));
-    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
-    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
+    Assert.assertFalse(dag.getAllNodes().contains(namedSpacedJobName));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namedSpacedJobName));
   }
 
-
   public static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;


Mime
View raw message