helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject helix git commit: Support deleting job from a job queue. The queue has to be stopped before deleting jobs.
Date Sat, 07 Mar 2015 19:16:49 GMT
Repository: helix
Updated Branches:
  refs/heads/master 1af7f5880 -> 2caac77f9


Support deleting job from a job queue. The queue has to be stopped before deleting jobs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2caac77f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2caac77f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2caac77f

Branch: refs/heads/master
Commit: 2caac77f9afa8f8c61425227cc22f5f5f2d73a59
Parents: 1af7f58
Author: Lei Xia <lxia@linkedin.com>
Authored: Thu Mar 5 18:15:32 2015 -0800
Committer: Kanak Biscuitwala <kanak.b@hotmail.com>
Committed: Sat Mar 7 11:16:30 2015 -0800

----------------------------------------------------------------------
 .../resources/ClusterRepresentationUtil.java    |   2 +-
 .../helix/webapp/resources/JsonParameters.java  |   4 +-
 .../helix/webapp/resources/ResourceUtil.java    | 180 +++++++++++++++++++
 .../apache/helix/webapp/AdminTestHelper.java    |  61 ++++++-
 .../main/java/org/apache/helix/task/JobDag.java |  30 ++++
 .../java/org/apache/helix/task/TaskDriver.java  | 111 +++++++++++-
 .../helix/integration/task/DummyTask.java       |  72 ++++++++
 .../task/TestTaskRebalancerStopResume.java      | 122 +++++++++++--
 8 files changed, 567 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
index f227801..e7fe9ca 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
@@ -50,7 +50,7 @@ import org.restlet.data.Form;
 import org.restlet.data.MediaType;
 
 public class ClusterRepresentationUtil {
-  private static final ZNRecord EMPTY_ZNRECORD = new ZNRecord("EMPTY_ZNRECORD");
+  public static final ZNRecord EMPTY_ZNRECORD = new ZNRecord("EMPTY_ZNRECORD");
 
   public static String getClusterPropertyAsString(ZkClient zkClient, String clusterName,
       PropertyKey propertyKey,

http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 19ac71a..e8924fb 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -96,8 +96,10 @@ public class JsonParameters {
   final Map<String, ZNRecord> _extraParameterMap = new HashMap<String, ZNRecord>();
 
   public JsonParameters(Representation entity) throws Exception {
-    Form form = new Form(entity);
+    this(new Form(entity));
+  }
 
+  public JsonParameters(Form form) throws Exception {
     // get parameters in String format
     String jsonPayload = form.getFirstValue(JSON_PARAMETERS, true);
     if (jsonPayload == null || jsonPayload.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
new file mode 100644
index 0000000..f045505
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
@@ -0,0 +1,180 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.webapp.RestAdminApplication;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.restlet.Context;
+import org.restlet.Request;
+import org.restlet.data.Form;
+
+public class ResourceUtil {
+  private static final String EMPTY_ZNRECORD_STRING =
+      objectToJson(ClusterRepresentationUtil.EMPTY_ZNRECORD);
+
+  /**
+   * Key enums for getting values from request
+   */
+  public enum RequestKey {
+    CLUSTER_NAME("clusterName"),
+    JOB_QUEUE("jobQueue"),
+    JOB("job"),
+    CONSTRAINT_TYPE("constraintType"),
+    CONSTRAINT_ID("constraintId"),
+    RESOURCE_NAME("resourceName"),
+    INSTANCE_NAME("instanceName");
+
+    private final String _key;
+
+    RequestKey(String key) {
+      _key = key;
+    }
+
+    public String toString() {
+      return _key;
+    }
+  }
+
+  /**
+   * Key enums for getting values from context
+   */
+  public enum ContextKey {
+    ZK_ADDR(RestAdminApplication.ZKSERVERADDRESS),
+    ZKCLIENT(RestAdminApplication.ZKCLIENT),
+    RAW_ZKCLIENT("rawZkClient"); // zkclient that uses raw-byte serializer
+
+    private final String _key;
+
+    ContextKey(String key) {
+      _key = key;
+    }
+
+    public String toString() {
+      return _key;
+    }
+  }
+
+  /**
+   * Key enums for getting yaml format parameters
+   */
+  public enum YamlParamKey {
+    NEW_JOB("newJob");
+
+    private final String _key;
+
+    YamlParamKey(String key) {
+      _key = key;
+    }
+
+    public String toString() {
+      return _key;
+    }
+  }
+
+  private static String objectToJson(Object object) {
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    try {
+      mapper.writeValue(sw, object);
+    } catch (JsonGenerationException e) {
+      // Should not be here
+    } catch (JsonMappingException e) {
+      // Should not be here
+    } catch (IOException e) {
+      // Should not be here
+    }
+
+    return sw.toString();
+  }
+
+  public static String getAttributeFromRequest(Request r, RequestKey key) {
+    return (String) r.getAttributes().get(key.toString());
+  }
+
+  public static ZkClient getAttributeFromCtx(Context ctx, ContextKey key) {
+    return (ZkClient) ctx.getAttributes().get(key.toString());
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T getTypedAttributeFromCtx(Class<T> klass, Context ctx, ContextKey
key) {
+    return (T) ctx.getAttributes().get(key.toString());
+  }
+
+  public static String getYamlParameters(Form form, YamlParamKey key) {
+    return form.getFirstValue(key.toString());
+  }
+
+  public static String readZkAsBytes(ZkClient zkclient, PropertyKey propertyKey) {
+    byte[] bytes = zkclient.readData(propertyKey.getPath());
+    return bytes == null ? EMPTY_ZNRECORD_STRING : new String(bytes);
+  }
+
+  static String extractSimpleFieldFromZNRecord(String recordStr, String key) {
+    int idx = recordStr.indexOf(key);
+    if (idx != -1) {
+      idx = recordStr.indexOf('"', idx + key.length() + 1);
+      if (idx != -1) {
+        int idx2 = recordStr.indexOf('"', idx + 1);
+        if (idx2 != -1) {
+          return recordStr.substring(idx + 1, idx2);
+        }
+      }
+
+    }
+    return null;
+  }
+
+  public static Map<String, String> readZkChildrenAsBytesMap(ZkClient zkclient,
+      PropertyKey propertyKey) {
+    BaseDataAccessor<byte[]> baseAccessor = new ZkBaseDataAccessor<byte[]>(zkclient);
+    String parentPath = propertyKey.getPath();
+    List<String> childNames = baseAccessor.getChildNames(parentPath, 0);
+    if (childNames == null) {
+      return null;
+    }
+    List<String> paths = new ArrayList<String>();
+    for (String childName : childNames) {
+      paths.add(parentPath + "/" + childName);
+    }
+    List<byte[]> values = baseAccessor.get(paths, null, 0);
+    Map<String, String> ret = new HashMap<String, String>();
+    for (int i = 0; i < childNames.size(); i++) {
+      ret.put(childNames.get(i), new String(values.get(i)));
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
index 9f6946d..d28ad62 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
@@ -19,9 +19,22 @@ package org.apache.helix.webapp;
  * under the License.
  */
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.helix.webapp.HelixAdminWebApp;
+import org.apache.helix.ZNRecord;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.restlet.Client;
+import org.restlet.Request;
+import org.restlet.Response;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Reference;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.testng.Assert;
 
 public class AdminTestHelper {
 
@@ -66,4 +79,50 @@ public class AdminTestHelper {
     }
   }
 
+  public static ZNRecord get(Client client, String url) throws IOException {
+    Reference resourceRef = new Reference(url);
+    Request request = new Request(Method.GET, resourceRef);
+    Response response = client.handle(request);
+    Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK);
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    String responseStr = sw.toString();
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+    return record;
+  }
+
+  public static void delete(Client client, String url) throws IOException {
+    Reference resourceRef = new Reference(url);
+    Request request = new Request(Method.DELETE, resourceRef);
+    Response response = client.handle(request);
+    Assert.assertEquals(response.getStatus(), Status.SUCCESS_NO_CONTENT);
+  }
+
+  public static ZNRecord post(Client client, String url, String body) throws IOException
{
+    Reference resourceRef = new Reference(url);
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(body, MediaType.APPLICATION_ALL);
+
+    Response response = client.handle(request);
+    Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK);
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+
+    if (result != null) {
+      result.write(sw);
+    }
+    String responseStr = sw.toString();
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+    return record;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 18a721e..3564f19 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -64,10 +64,40 @@ public class JobDag {
     _allNodes.add(child);
   }
 
+  public void removeParentToChild(String parent, String child) {
+    if (_parentsToChildren.containsKey(parent)) {
+      Set<String> children = _parentsToChildren.get(parent);
+      children.remove(child);
+      if (children.isEmpty()) {
+        _parentsToChildren.remove(parent);
+      }
+    }
+
+    if (_childrenToParents.containsKey(child)) {
+      Set<String> parents =  _childrenToParents.get(child);
+      parents.remove(parent);
+      if (parents.isEmpty()) {
+        _childrenToParents.remove(child);
+      }
+    }
+  }
+
   public void addNode(String node) {
     _allNodes.add(node);
   }
 
+  /**
+   * must make sure no other node dependence before removing the node
+   */
+  public void removeNode(String node) {
+    if (_parentsToChildren.containsKey(node) || _childrenToParents.containsKey(node)) {
+      throw new IllegalStateException(
+          "The node is either a parent or a child of other node, could not be deleted");
+    }
+
+    _allNodes.remove(node);
+  }
+
   public Map<String, Set<String>> getParentsToChildren() {
     return _parentsToChildren;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index bcbe76a..76acc64 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -47,6 +47,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.store.HelixPropertyStore;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
@@ -77,12 +78,13 @@ public class TaskDriver {
   private final String _clusterName;
 
   /** Commands which may be parsed from the first argument to main */
-  private enum DriverCommand {
+  public enum DriverCommand {
     start,
     stop,
     delete,
     resume,
-    list
+    list,
+    flush
   }
 
   public TaskDriver(HelixManager manager) {
@@ -135,6 +137,10 @@ public class TaskDriver {
         break;
       case list:
         driver.list(resource);
+        break;
+      case flush:
+        driver.flushQueue(resource);
+        break;
       default:
         throw new IllegalArgumentException("Unknown command " + args[0]);
       }
@@ -244,6 +250,107 @@ public class TaskDriver {
     _manager.getHelixPropertyStore().update(path, updater, AccessOption.PERSISTENT);
   }
 
+  /** Delete a job from an existing named queue, the queue has to be stopped prior to this
call */
+  public void deleteJob(final String queueName, final String jobName) {
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    HelixProperty workflowConfig =
+        accessor.getProperty(accessor.keyBuilder().resourceConfig(queueName));
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+    }
+    boolean isTerminable =
+        workflowConfig.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
+    if (isTerminable) {
+      throw new IllegalArgumentException(queueName + " is not a queue!");
+    }
+
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, queueName);
+    String workflowState =
+        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+
+    if (workflowState.equals(TaskState.IN_PROGRESS)) {
+      throw new IllegalStateException("Queue " + queueName + " is still in progress!");
+    }
+
+    // Remove the job from the queue in the DAG
+    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        // Add the node to the existing DAG
+        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        Set<String> allNodes = jobDag.getAllNodes();
+        if (!allNodes.contains(namespacedJobName)) {
+          throw new IllegalStateException("Could not delete job from queue " + queueName
+ ", job "
+              + jobName + " not exists");
+        }
+
+        String parent = null;
+        String child = null;
+        // remove the node from the queue
+        for (String node : allNodes) {
+          if (!node.equals(namespacedJobName)) {
+            if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
+              parent = node;
+              jobDag.removeParentToChild(parent, namespacedJobName);
+            } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
+              child = node;
+              jobDag.removeParentToChild(namespacedJobName, child);
+            }
+          }
+        }
+
+        if (parent != null && child != null) {
+          jobDag.addParentToChild(parent, child);
+        }
+
+        jobDag.removeNode(namespacedJobName);
+
+        // Save the updated DAG
+        try {
+          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+        } catch (Exception e) {
+          throw new IllegalStateException("Could not remove job " + jobName + " from queue
"
+              + queueName, e);
+        }
+        return currentData;
+      }
+    };
+
+    String path = accessor.keyBuilder().resourceConfig(queueName).getPath();
+    boolean status = accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      throw new IllegalArgumentException("Could not enqueue job");
+    }
+
+    // delete the ideal state and resource config for the job
+    _admin.dropResource(_clusterName, namespacedJobName);
+
+    // update queue's property to remove job from JOB_STATES if it is already started.
+    String queuePropertyPath =
+        Joiner.on("/")
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
+    updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
+          if (states != null && states.containsKey(namespacedJobName)) {
+            states.keySet().remove(namespacedJobName);
+          }
+        }
+        return currentData;
+      }
+    };
+    HelixPropertyStore<ZNRecord> propertyStore = _manager.getHelixPropertyStore();
+    propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT);
+
+    // Delete the job from property store
+    String jobPropertyPath =
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
+    propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
+  }
+
   /** Adds a new job to the end an existing named queue */
   public void enqueueJob(final String queueName, final String jobName, JobConfig.Builder
jobBuilder)
       throws Exception {

http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
new file mode 100644
index 0000000..b6054d0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
@@ -0,0 +1,72 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+public class DummyTask implements Task {
+  private static final String TIMEOUT_CONFIG = "Timeout";
+  private final long _delay;
+  private volatile boolean _canceled;
+
+  public DummyTask(TaskCallbackContext context) {
+    JobConfig jobCfg = context.getJobConfig();
+    Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
+    if (cfg == null) {
+      cfg = Collections.emptyMap();
+    }
+    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) :
200L;
+  }
+
+  @Override
+  public TaskResult run() {
+    long expiry = System.currentTimeMillis() + _delay;
+    long timeLeft;
+    while (System.currentTimeMillis() < expiry) {
+      if (_canceled) {
+        timeLeft = expiry - System.currentTimeMillis();
+        return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0
? 0
+            : timeLeft));
+      }
+      sleep(50);
+    }
+    timeLeft = expiry - System.currentTimeMillis();
+    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0
: timeLeft));
+  }
+
+  @Override
+  public void cancel() {
+    _canceled = true;
+  }
+
+  private static void sleep(long d) {
+    try {
+      Thread.sleep(d);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/2caac77f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index b9e9811..bc03b97 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -19,9 +19,11 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -250,20 +252,120 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
     // Flush queue and check cleanup
     LOG.info("Flusing job-queue: " + queueName);
     _driver.flushQueue(queueName);
+
+    verifyJobDeleted(queueName, namespacedJob1);
+    verifyJobDeleted(queueName, namespacedJob2);
+    verifyJobNotInQueue(queueName, namespacedJob1);
+    verifyJobNotInQueue(queueName, namespacedJob2);
+  }
+
+  @Test
+  public void stopDeleteAndResumeNamedQueue() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = new JobQueue.Builder(queueName).build();
+    _driver.createQueue(queue);
+
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i <= 4; i++) {
+      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+      JobConfig.Builder job =
+          new JobConfig.Builder().setCommand("Reindex")
+              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      String jobName = targetPartition.toLowerCase() + "Job" + i;
+      LOG.info("Enqueuing job: " + jobName);
+      _driver.enqueueJob(queueName, jobName, job);
+      currentJobNames.add(i, jobName);
+    }
+
+    // ensure job 1 is started before deleting it
+    String deletedJob1 = currentJobNames.get(0);
+    String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1);
+    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+
+    // stop the queue
+    LOG.info("Pausing job-queue: " + queueName);
+    _driver.stop(queueName);
+    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+
+    // delete the in-progress job (job 1) and verify it being deleted
+    _driver.deleteJob(queueName, deletedJob1);
+    verifyJobDeleted(queueName, namedSpaceDeletedJob1);
+
+    LOG.info("Resuming job-queue: " + queueName);
+    _driver.resume(queueName);
+
+    // ensure job 2 is started
+    TestUtil.pollForJobState(_manager, queueName,
+        String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
+
+    // stop the queue
+    LOG.info("Pausing job-queue: " + queueName);
+    _driver.stop(queueName);
+    TestUtil.pollForJobState(_manager, queueName,
+        String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+
+    // Ensure job 3 is not started before deleting it
+    String deletedJob2 = currentJobNames.get(2);
+    String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2);
+    TestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
+
+    // delete not-started job (job 3) and verify it being deleted
+    _driver.deleteJob(queueName, deletedJob2);
+    verifyJobDeleted(queueName, namedSpaceDeletedJob2);
+
+    LOG.info("Resuming job-queue: " + queueName);
+    _driver.resume(queueName);
+
+    // Ensure the jobs left are successful completed in the correct order
+    currentJobNames.remove(deletedJob1);
+    currentJobNames.remove(deletedJob2);
+    long preJobFinish = 0;
+    for (int i = 0; i < currentJobNames.size(); i++) {
+      String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
+      TestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
+
+      JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
+      long jobStart = jobContext.getStartTime();
+      Assert.assertTrue(jobStart >= preJobFinish);
+      preJobFinish = jobContext.getFinishTime();
+    }
+
+    // Flush queue
+    LOG.info("Flusing job-queue: " + queueName);
+    _driver.flushQueue(queueName);
+
+    TimeUnit.MILLISECONDS.sleep(200);
+    // verify the cleanup
+    for (int i = 0; i < currentJobNames.size(); i++) {
+      String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
+      verifyJobDeleted(queueName, namedSpaceJobName);
+      verifyJobNotInQueue(queueName, namedSpaceJobName);
+    }
+  }
+
+  private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1)));
-    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
-    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
-    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
+
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
+    TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
+  }
+
+  private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) {
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
     JobDag dag = workflowCfg.getJobDag();
-    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
-    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));
-    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1));
-    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2));
-    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
-    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
+    Assert.assertFalse(dag.getAllNodes().contains(namedSpacedJobName));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namedSpacedJobName));
   }
 
   public static class ReindexTask implements Task {


Mime
View raw message