Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BC7B010AA6 for ; Sat, 7 Mar 2015 18:28:37 +0000 (UTC) Received: (qmail 4560 invoked by uid 500); 7 Mar 2015 18:28:37 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 4528 invoked by uid 500); 7 Mar 2015 18:28:37 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 4519 invoked by uid 99); 7 Mar 2015 18:28:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Mar 2015 18:28:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86B60E0B29; Sat, 7 Mar 2015 18:28:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Message-Id: <5ac3f44c177f4b11b972a88f61b5ce7a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: helix git commit: Support deleting job from a job queue. The queue has to be stopped before deleting jobs. Date: Sat, 7 Mar 2015 18:28:37 +0000 (UTC) Repository: helix Updated Branches: refs/heads/helix-0.6.x 03d25dd5b -> fd39189bb Support deleting job from a job queue. The queue has to be stopped before deleting jobs. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fd39189b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fd39189b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fd39189b Branch: refs/heads/helix-0.6.x Commit: fd39189bb9959c42ce8ce7934fdeec0c12ff3147 Parents: 03d25dd Author: Lei Xia Authored: Thu Mar 5 18:15:32 2015 -0800 Committer: Lei Xia Committed: Thu Mar 5 18:15:32 2015 -0800 ---------------------------------------------------------------------- .../helix/webapp/resources/JobResource.java | 29 +++++ .../apache/helix/webapp/AdminTestHelper.java | 7 ++ .../webapp/resources/TestJobQueuesResource.java | 33 +++-- .../main/java/org/apache/helix/task/JobDag.java | 30 +++++ .../java/org/apache/helix/task/TaskDriver.java | 99 +++++++++++++++ .../task/TestTaskRebalancerStopResume.java | 123 +++++++++++++++++-- 6 files changed, 302 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java index 0193c4c..cdcde35 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java @@ -30,9 +30,11 @@ import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.store.HelixPropertyStore; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.task.JobContext; +import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskUtil; import org.apache.log4j.Logger; import org.restlet.data.MediaType; +import org.restlet.data.Status; import org.restlet.representation.Representation; import org.restlet.representation.StringRepresentation; import org.restlet.representation.Variant; @@ -78,6 +80,33 @@ public class JobResource extends ServerResource { return presentation; } + + @Override + public Representation delete() { + StringRepresentation representation = null; + + String clusterName = ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String jobQueueName = ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE); + String jobName = ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB); + + ZkClient zkClient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + + TaskDriver driver = new TaskDriver(zkClient, clusterName); + + try { + driver.deleteJob(jobQueueName, jobName); + getResponse().setStatus(Status.SUCCESS_NO_CONTENT); + } catch (Exception e) { + String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); + representation = new StringRepresentation(error, MediaType.APPLICATION_JSON); + + LOG.error("Fail to delete job: " + jobName, e); + } + + return representation; + } + StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName, String jobName) throws Exception { http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java index 5b371cc..c07cc86 100644 --- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java +++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java @@ -100,6 +100,13 @@ public class AdminTestHelper { return record; } + public static void delete(Client client, String url) throws IOException { + Reference resourceRef = new Reference(url); + Request request = new Request(Method.DELETE, resourceRef); + Response response = client.handle(request); + Assert.assertEquals(response.getStatus(), Status.SUCCESS_NO_CONTENT); + } + public static ZNRecord post(Client client, String url, String body) throws IOException { Reference resourceRef = new Reference(url); http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java index cc922ad..9c2306a 100644 --- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java +++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java @@ -130,14 +130,22 @@ public class TestJobQueuesResource extends AdminTestBase { WorkflowBean wfBean = new WorkflowBean(); wfBean.name = queueName; - JobBean jBean = new JobBean(); - jBean.name = "myJob1"; - jBean.command = "DummyTask"; - jBean.targetResource = WorkflowGenerator.DEFAULT_TGT_DB; - jBean.targetPartitionStates = Lists.newArrayList("MASTER"); - wfBean.jobs = Lists.newArrayList(jBean); + + JobBean jBean1 = new JobBean(); + jBean1.name = "myJob1"; + jBean1.command = "DummyTask"; + jBean1.targetResource = WorkflowGenerator.DEFAULT_TGT_DB; + jBean1.targetPartitionStates = Lists.newArrayList("MASTER"); + + JobBean jBean2 = new JobBean(); + jBean2.name = "myJob2"; + jBean2.command = "DummyTask"; + jBean2.targetResource = WorkflowGenerator.DEFAULT_TGT_DB; + jBean2.targetPartitionStates = Lists.newArrayList("SLAVE"); + + wfBean.jobs = Lists.newArrayList(jBean1, jBean2); String jobYamlConfig = new Yaml().dump(wfBean); - LOG.info("Enqueuing a job: " + jobQueueYamlConfig); + LOG.info("Enqueuing jobs: " + jobQueueYamlConfig); Map paraMap = new HashMap(); paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.start.toString()); @@ -152,7 +160,7 @@ public class TestJobQueuesResource extends AdminTestBase { // Get job resourceUrl = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName - + "/" + jBean.name; + + "/" + jBean1.name; getRet = AdminTestHelper.get(_gClient, resourceUrl); LOG.info("Got job: " + getRet); @@ -164,7 +172,16 @@ public class TestJobQueuesResource extends AdminTestBase { postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); LOG.info("Stopped job-queue, ret: " + postRet); + // Delete a job + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName + + "/" + jBean2.name; + AdminTestHelper.delete(_gClient, resourceUrl); + LOG.info("Delete a job: "); + // Resume job queue + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName; paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.resume.toString()); postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap)); postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-core/src/main/java/org/apache/helix/task/JobDag.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java index 18a721e..3564f19 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java @@ -64,10 +64,40 @@ public class JobDag { _allNodes.add(child); } + public void removeParentToChild(String parent, String child) { + if (_parentsToChildren.containsKey(parent)) { + Set children = _parentsToChildren.get(parent); + children.remove(child); + if (children.isEmpty()) { + _parentsToChildren.remove(parent); + } + } + + if (_childrenToParents.containsKey(child)) { + Set parents = _childrenToParents.get(child); + parents.remove(parent); + if (parents.isEmpty()) { + _childrenToParents.remove(child); + } + } + } + public void addNode(String node) { _allNodes.add(node); } + /** + * must make sure no other node dependence before removing the node + */ + public void removeNode(String node) { + if (_parentsToChildren.containsKey(node) || _childrenToParents.containsKey(node)) { + throw new IllegalStateException( + "The node is either a parent or a child of other node, could not be deleted"); + } + + _allNodes.remove(node); + } + public Map> getParentsToChildren() { return _parentsToChildren; } http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 60dc22c..87d81a1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -279,6 +279,105 @@ public class TaskDriver { _propertyStore.update(path, updater, AccessOption.PERSISTENT); } + /** Delete a job from an existing named queue, the queue has to be stopped prior to this call */ + public void deleteJob(final String queueName, final String jobName) { + HelixProperty workflowConfig = _accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName)); + if (workflowConfig == null) { + throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!"); + } + boolean isTerminable = + workflowConfig.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true); + if (isTerminable) { + throw new IllegalArgumentException(queueName + " is not a queue!"); + } + + WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName); + String workflowState = + (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name(); + + if (workflowState.equals(TaskState.IN_PROGRESS)) { + throw new IllegalStateException("Queue " + queueName + " is still in progress!"); + } + + // Remove the job from the queue in the DAG + final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); + DataUpdater updater = new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + // Add the node to the existing DAG + JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG)); + Set allNodes = jobDag.getAllNodes(); + if (!allNodes.contains(namespacedJobName)) { + throw new IllegalStateException("Could not delete job from queue " + queueName + ", job " + + jobName + " not exists"); + } + + String parent = null; + String child = null; + // remove the node from the queue + for (String node : allNodes) { + if (!node.equals(namespacedJobName)) { + if (jobDag.getDirectChildren(node).contains(namespacedJobName)) { + parent = node; + jobDag.removeParentToChild(parent, namespacedJobName); + } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) { + child = node; + jobDag.removeParentToChild(namespacedJobName, child); + } + } + } + + if (parent != null && child != null) { + jobDag.addParentToChild(parent, child); + } + + jobDag.removeNode(namespacedJobName); + + // Save the updated DAG + try { + currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); + } catch (Exception e) { + throw new IllegalStateException( + "Could not remove job " + jobName + " from queue " + queueName, e); + } + return currentData; + } + }; + + String path = _accessor.keyBuilder().resourceConfig(queueName).getPath(); + boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT); + if (!status) { + throw new IllegalArgumentException("Could not enqueue job"); + } + + // delete the ideal state and resource config for the job + _admin.dropResource(_clusterName, namespacedJobName); + + // update queue's property to remove job from JOB_STATES if it is already started. + String queuePropertyPath = + Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE); + updater = new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData != null) { + Map states = currentData.getMapField(WorkflowContext.JOB_STATES); + if (states != null && states.containsKey(namespacedJobName)) { + states.keySet().remove(namespacedJobName); + } + } + return currentData; + } + }; + _propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT); + + // Delete the job from property store + String jobPropertyPath = + Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName); + _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT); + } + /** Adds a new job to the end an existing named queue */ public void enqueueJob(final String queueName, final String jobName, JobConfig.Builder jobBuilder) throws Exception { http://git-wip-us.apache.org/repos/asf/helix/blob/fd39189b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index 2ec795a..500f029 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -19,8 +19,10 @@ package org.apache.helix.integration.task; * under the License. */ +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -245,23 +247,122 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { // Flush queue and check cleanup LOG.info("Flusing job-queue: " + queueName); _driver.flushQueue(queueName); + + verifyJobDeleted(queueName, namespacedJob1); + verifyJobDeleted(queueName, namespacedJob2); + verifyJobNotInQueue(queueName, namespacedJob1); + verifyJobNotInQueue(queueName, namespacedJob2); + } + + @Test + public void stopDeleteAndResumeNamedQueue() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = new JobQueue.Builder(queueName).build(); + _driver.createQueue(queue); + + // Create and Enqueue jobs + List currentJobNames = new ArrayList(); + for (int i = 0; i <= 4; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + String jobName = targetPartition.toLowerCase() + "Job" + i; + LOG.info("Enqueuing job: " + jobName); + _driver.enqueueJob(queueName, jobName, job); + currentJobNames.add(i, jobName); + } + + // ensure job 1 is started before deleting it + String deletedJob1 = currentJobNames.get(0); + String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1); + TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS); + + // stop the queue + LOG.info("Pausing job-queue: " + queueName); + _driver.stop(queueName); + TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED); + TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); + + // delete the in-progress job (job 1) and verify it being deleted + _driver.deleteJob(queueName, deletedJob1); + verifyJobDeleted(queueName, namedSpaceDeletedJob1); + + LOG.info("Resuming job-queue: " + queueName); + _driver.resume(queueName); + + // ensure job 2 is started + TestUtil.pollForJobState(_manager, queueName, + String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS); + + // stop the queue + LOG.info("Pausing job-queue: " + queueName); + _driver.stop(queueName); + TestUtil.pollForJobState(_manager, queueName, + String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED); + TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); + + // Ensure job 3 is not started before deleting it + String deletedJob2 = currentJobNames.get(2); + String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2); + TestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2); + + // delete not-started job (job 3) and verify it being deleted + _driver.deleteJob(queueName, deletedJob2); + verifyJobDeleted(queueName, namedSpaceDeletedJob2); + + LOG.info("Resuming job-queue: " + queueName); + _driver.resume(queueName); + + // Ensure the jobs left are successful completed in the correct order + currentJobNames.remove(deletedJob1); + currentJobNames.remove(deletedJob2); + long preJobFinish = 0; + for (int i = 0; i < currentJobNames.size(); i++) { + String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i)); + TestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED); + + JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName); + long jobStart = jobContext.getStartTime(); + Assert.assertTrue(jobStart >= preJobFinish); + preJobFinish = jobContext.getFinishTime(); + } + + // Flush queue + LOG.info("Flusing job-queue: " + queueName); + _driver.flushQueue(queueName); + + TimeUnit.MILLISECONDS.sleep(200); + // verify the cleanup + for (int i = 0; i < currentJobNames.size(); i++) { + String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i)); + verifyJobDeleted(queueName, namedSpaceJobName); + verifyJobNotInQueue(queueName, namedSpaceJobName); + } + } + + private void verifyJobDeleted(String queueName, String jobName) throws Exception { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1))); - Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1))); - Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2))); - Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2))); + + Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName))); + Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName))); + TestUtil.pollForEmptyJobState(_manager, queueName, jobName); + } + + private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) { WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName); JobDag dag = workflowCfg.getJobDag(); - Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1)); - Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2)); - Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1)); - Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2)); - Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1)); - Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2)); + Assert.assertFalse(dag.getAllNodes().contains(namedSpacedJobName)); + Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName)); + Assert.assertFalse(dag.getParentsToChildren().containsKey(namedSpacedJobName)); } - public static class ReindexTask implements Task { private final long _delay; private volatile boolean _canceled;