helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject helix git commit: [HELIX-546] Add REST API for Helix job queue management - second part, rb=28584
Date Tue, 02 Dec 2014 22:07:55 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 059ab387b -> aa2e968f7


[HELIX-546] Add REST API for Helix job queue management - second part, rb=28584


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aa2e968f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aa2e968f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aa2e968f

Branch: refs/heads/helix-0.6.x
Commit: aa2e968f7ac4944ce60f9defdeee2fb9b8638cb9
Parents: 059ab38
Author: zzhang <zzhang@apache.org>
Authored: Tue Dec 2 14:07:34 2014 -0800
Committer: zzhang <zzhang@apache.org>
Committed: Tue Dec 2 14:07:34 2014 -0800

----------------------------------------------------------------------
 .../apache/helix/webapp/HelixAdminWebApp.java   |  16 +-
 .../helix/webapp/RestAdminApplication.java      |   6 +
 .../resources/ClusterRepresentationUtil.java    |   4 +-
 .../webapp/resources/JobQueueResource.java      | 173 +++++++++++++++++
 .../webapp/resources/JobQueuesResource.java     | 148 +++++++++++++++
 .../helix/webapp/resources/JobResource.java     | 107 +++++++++++
 .../helix/webapp/resources/JsonParameters.java  |  21 +-
 .../helix/webapp/resources/ResourceUtil.java    |  95 ++++++++++
 .../org/apache/helix/webapp/AdminTestBase.java  |   7 +-
 .../apache/helix/webapp/AdminTestHelper.java    |  57 ++++++
 .../java/resources/TestJobQueuesResource.java   | 190 +++++++++++++++++++
 .../test/java/resources/TestJsonParameters.java |  44 +++++
 .../apache/helix/manager/zk/ZKHelixManager.java |   2 +-
 .../java/org/apache/helix/task/TaskDriver.java  |   9 +-
 .../java/org/apache/helix/task/Workflow.java    |   2 +-
 pom.xml                                         |   2 +-
 16 files changed, 852 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
index 1988636..991886c 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
@@ -28,12 +28,12 @@ import org.restlet.data.Protocol;
 
 public class HelixAdminWebApp {
   public final Logger LOG = Logger.getLogger(HelixAdminWebApp.class);
-  RestAdminApplication _adminApp = null;
-  Component _component = null;
+  private RestAdminApplication _adminApp = null;
+  private Component _component = null;
 
-  int _helixAdminPort;
-  String _zkServerAddress;
-  ZkClient _zkClient;
+  private final int _helixAdminPort;
+  private final String _zkServerAddress;
+  private ZkClient _zkClient = null;
 
   public HelixAdminWebApp(String zkServerAddress, int adminPort) {
     _zkServerAddress = zkServerAddress;
@@ -58,14 +58,16 @@ public class HelixAdminWebApp {
       _component.getDefaultHost().attach(_adminApp);
       _component.start();
     }
-    LOG.info("helixAdminWebApp started on port " + _helixAdminPort);
+    LOG.info("helixAdminWebApp started on port: " + _helixAdminPort);
   }
 
   public synchronized void stop() {
+    LOG.info("Stopping helixAdminWebApp");
     try {
       _component.stop();
+      LOG.info("Stopped helixAdminWebApp");
     } catch (Exception e) {
-      LOG.error("", e);
+      LOG.error("Exception in stopping helixAdminWebApp", e);
     } finally {
       if (_zkClient != null) {
         _zkClient.close();

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
index 7e7a3b9..9842a3d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
@@ -43,6 +43,9 @@ import org.apache.helix.webapp.resources.ExternalViewResource;
 import org.apache.helix.webapp.resources.IdealStateResource;
 import org.apache.helix.webapp.resources.InstanceResource;
 import org.apache.helix.webapp.resources.InstancesResource;
+import org.apache.helix.webapp.resources.JobQueuesResource;
+import org.apache.helix.webapp.resources.JobQueueResource;
+import org.apache.helix.webapp.resources.JobResource;
 import org.apache.helix.webapp.resources.ResourceGroupResource;
 import org.apache.helix.webapp.resources.ResourceGroupsResource;
 import org.apache.helix.webapp.resources.SchedulerTasksResource;
@@ -92,6 +95,9 @@ public class RestAdminApplication extends Application {
     router.attach("/clusters/{clusterName}/resourceGroups/{resourceName}",
         ResourceGroupResource.class);
     router.attach("/clusters/{clusterName}/workflows", WorkflowsResource.class);
+    router.attach("/clusters/{clusterName}/jobQueues", JobQueuesResource.class);
+    router.attach("/clusters/{clusterName}/jobQueues/{jobQueue}", JobQueueResource.class);
+    router.attach("/clusters/{clusterName}/jobQueues/{jobQueue}/{job}", JobResource.class);
     router.attach("/clusters/{clusterName}/instances", InstancesResource.class);
     router.attach("/clusters/{clusterName}/instances/{instanceName}", InstanceResource.class);
     router.attach("/clusters/{clusterName}/instances/{instanceName}/currentState/{resourceName}",

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
index f227801..5e458c4 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
@@ -53,9 +53,7 @@ public class ClusterRepresentationUtil {
   private static final ZNRecord EMPTY_ZNRECORD = new ZNRecord("EMPTY_ZNRECORD");
 
   public static String getClusterPropertyAsString(ZkClient zkClient, String clusterName,
-      PropertyKey propertyKey,
-      // String key,
-      MediaType mediaType)
+      PropertyKey propertyKey, MediaType mediaType)
 
   throws JsonGenerationException, JsonMappingException, IOException {
     return getClusterPropertyAsString(zkClient, clusterName, mediaType, propertyKey);

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
new file mode 100644
index 0000000..3ff9a37
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -0,0 +1,173 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.log4j.Logger;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import java.util.Map;
+
+public class JobQueueResource extends ServerResource {
+  private final static Logger LOG = Logger.getLogger(JobQueueResource.class);
+
+  public JobQueueResource() {
+    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+    setNegotiated(false);
+  }
+
+  @Override
+  public Representation get() {
+    StringRepresentation presentation;
+    try {
+      String clusterName =
+          ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+      String jobQueueName =
+          ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE);
+      presentation = getHostedEntitiesRepresentation(clusterName, jobQueueName);
+    } catch (Exception e) {
+      String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+      presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+      LOG.error("Fail to get job queue", e);
+    }
+    return presentation;
+  }
+
+  StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName)
+      throws Exception {
+    ZkClient zkClient =
+        ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+    HelixDataAccessor accessor =
+        ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // Get job queue config
+    HelixProperty jobQueueConfig = accessor.getProperty(keyBuilder.resourceConfig(jobQueueName));
+
+    // Get job queue context
+    String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
+    HelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null);
+    WorkflowContext ctx = TaskUtil.getWorkflowContext(propertyStore, jobQueueName);
+
+    // Create the result
+    ZNRecord hostedEntitiesRecord = new ZNRecord(jobQueueName);
+    if (jobQueueConfig != null) {
+      hostedEntitiesRecord.merge(jobQueueConfig.getRecord());
+    }
+    if (ctx != null) {
+      hostedEntitiesRecord.merge(ctx.getRecord());
+    }
+
+    StringRepresentation representation =
+        new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+            MediaType.APPLICATION_JSON);
+
+    return representation;
+  }
+
+  @Override
+  public Representation post(Representation entity) {
+    String clusterName =
+        ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+    String jobQueueName =
+        ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE);
+    ZkClient zkClient =
+        ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+    try {
+      TaskDriver driver = new TaskDriver(zkClient, clusterName);
+
+      Form form = new Form(entity);
+      JsonParameters jsonParameters = new JsonParameters(form);
+
+      TaskDriver.DriverCommand cmd = TaskDriver.DriverCommand.valueOf(jsonParameters.getCommand());
+      switch (cmd) {
+      case start: {
+        // Get the job queue and submit it
+        String yamlPayload = ResourceUtil.getYamlParameters(form, ResourceUtil.YamlParamKey.NEW_JOB);
+        if (yamlPayload == null) {
+          throw new HelixException("Yaml job config is required!");
+        }
+        Workflow workflow = Workflow.parse(yamlPayload);
+
+        for (String jobName : workflow.getJobConfigs().keySet()) {
+          Map<String, String> jobCfgMap = workflow.getJobConfigs().get(jobName);
+          JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobCfgMap);
+          if (workflow.getTaskConfigs() != null && workflow.getTaskConfigs().containsKey(jobName)) {
+            jobCfgBuilder.addTaskConfigs(workflow.getTaskConfigs().get(jobName));
+          }
+          driver.enqueueJob(jobQueueName, TaskUtil.getDenamespacedJobName(jobQueueName, jobName),
+              jobCfgBuilder);
+        }
+        break;
+      }
+      case stop: {
+        driver.stop(jobQueueName);
+        break;
+      }
+      case resume: {
+        driver.resume(jobQueueName);
+        break;
+      }
+      case flush: {
+        driver.flushQueue(jobQueueName);
+        break;
+      }
+      case delete: {
+        driver.delete(jobQueueName);
+        break;
+      }
+      default:
+        throw new HelixException("Unsupported job queue command: " + cmd);
+      }
+      getResponse().setEntity(getHostedEntitiesRepresentation(clusterName, jobQueueName));
+      getResponse().setStatus(Status.SUCCESS_OK);
+
+    } catch (Exception e) {
+      getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+          MediaType.APPLICATION_JSON);
+      getResponse().setStatus(Status.SUCCESS_OK);
+      LOG.error("Error in posting job queue: " + entity, e);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
new file mode 100644
index 0000000..24a4387
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
@@ -0,0 +1,148 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Parameter;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class JobQueuesResource extends ServerResource {
+  private final static Logger LOG = Logger.getLogger(JobQueuesResource.class);
+
+  public JobQueuesResource() {
+    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+    setNegotiated(false);
+  }
+
+  @Override
+  public Representation get() {
+    StringRepresentation presentation = null;
+    try {
+      String clusterName =
+          ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+      presentation = getHostedEntitiesRepresentation(clusterName);
+    } catch (Exception e) {
+      String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+      presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+      LOG.error("Fail to get all job queues", e);
+    }
+    return presentation;
+  }
+
+  StringRepresentation getHostedEntitiesRepresentation(String clusterName)
+      throws JsonGenerationException, JsonMappingException, IOException {
+    // Get all resources
+    ZkClient zkClient =
+        ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+    HelixDataAccessor accessor =
+        ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+
+    // Create the result
+    ZNRecord hostedEntitiesRecord = new ZNRecord("JobQueues");
+
+    // Filter out non-workflow resources
+    Iterator<Map.Entry<String, HelixProperty>> it = resourceConfigMap.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String, HelixProperty> e = it.next();
+      HelixProperty resource = e.getValue();
+      Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
+      boolean isTerminable = resource.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
+      if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
+          || !simpleFields.containsKey(WorkflowConfig.DAG) || isTerminable) {
+        it.remove();
+      }
+    }
+
+    // Populate the result
+    List<String> allResources = Lists.newArrayList(resourceConfigMap.keySet());
+    hostedEntitiesRecord.setListField("JobQueues", allResources);
+
+    StringRepresentation representation =
+        new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+            MediaType.APPLICATION_JSON);
+
+    return representation;
+  }
+
+  @Override
+  public Representation post(Representation entity) {
+    try {
+      String clusterName =
+          ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+      ZkClient zkClient =
+          ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+
+      Form form = new Form(entity);
+      // Get the job queue and submit it
+      if (form.size() < 1) {
+        throw new HelixException("Yaml job queue config is required!");
+      }
+      Parameter payload = form.get(0);
+      String yamlPayload = payload.getName();
+      if (yamlPayload == null) {
+        throw new HelixException("Yaml job queue config is required!");
+      }
+
+      Workflow workflow = Workflow.parse(yamlPayload);
+      JobQueue.Builder jobQueueCfgBuilder = new JobQueue.Builder(workflow.getName());
+      jobQueueCfgBuilder.fromMap(workflow.getWorkflowConfig().getResourceConfigMap());
+      TaskDriver driver = new TaskDriver(zkClient, clusterName);
+      driver.createQueue(jobQueueCfgBuilder.build());
+
+      getResponse().setEntity(getHostedEntitiesRepresentation(clusterName));
+      getResponse().setStatus(Status.SUCCESS_OK);
+    } catch (Exception e) {
+      getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+          MediaType.APPLICATION_JSON);
+      getResponse().setStatus(Status.SUCCESS_OK);
+      LOG.error("Exception in posting job queue: " + entity, e);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
new file mode 100644
index 0000000..a58e223
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
@@ -0,0 +1,107 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskUtil;
+import org.apache.log4j.Logger;
+import org.restlet.data.MediaType;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+public class JobResource extends ServerResource {
+  private final static Logger LOG = Logger.getLogger(JobResource.class);
+
+  public JobResource() {
+    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+    setNegotiated(false);
+  }
+
+  @Override
+  public Representation get() {
+    StringRepresentation presentation;
+    String clusterName =
+        ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+    String jobQueueName =
+        ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE);
+    String jobName =
+        ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB);
+
+    try {
+      presentation = getHostedEntitiesRepresentation(clusterName, jobQueueName, jobName);
+    } catch (Exception e) {
+      String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+      presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+      LOG.error("Fail to get job: " + jobName, e);
+    }
+    return presentation;
+  }
+
+  StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName,
+      String jobName) throws Exception {
+
+    ZkClient zkClient =
+        ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+    HelixDataAccessor accessor =
+        ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // Get job queue config
+    String namespacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, jobName);
+    HelixProperty jobConfig = accessor.getProperty(keyBuilder.resourceConfig(namespacedJobName));
+
+    // Get job queue context
+    JobContext ctx = null;
+    String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
+    HelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null);
+
+    ctx = TaskUtil.getJobContext(propertyStore, namespacedJobName);
+
+    // Create the result
+    ZNRecord hostedEntitiesRecord = new ZNRecord(namespacedJobName);
+    if (jobConfig != null) {
+      hostedEntitiesRecord.merge(jobConfig.getRecord());
+    }
+    if (ctx != null) {
+      hostedEntitiesRecord.merge(ctx.getRecord());
+    }
+
+    StringRepresentation representation =
+        new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+            MediaType.APPLICATION_JSON);
+
+    return representation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 19ac71a..41d9a77 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -96,8 +96,11 @@ public class JsonParameters {
   final Map<String, ZNRecord> _extraParameterMap = new HashMap<String, ZNRecord>();
 
   public JsonParameters(Representation entity) throws Exception {
-    Form form = new Form(entity);
+    this(new Form(entity));
 
+  }
+
+  public JsonParameters(Form form) throws Exception {
     // get parameters in String format
     String jsonPayload = form.getFirstValue(JSON_PARAMETERS, true);
     if (jsonPayload == null || jsonPayload.isEmpty()) {
@@ -151,7 +154,7 @@ public class JsonParameters {
     }
 
     if (!_parameterMap.containsKey(MANAGEMENT_COMMAND)) {
-      throw new HelixException("Missing management paramater '" + MANAGEMENT_COMMAND + "'");
+      throw new HelixException("Missing management parameter '" + MANAGEMENT_COMMAND + "'");
     }
 
     if (!_parameterMap.get(MANAGEMENT_COMMAND).equalsIgnoreCase(command)
@@ -217,25 +220,17 @@ public class JsonParameters {
       }
     } else if (command.equalsIgnoreCase(ClusterSetup.addResource)) {
       if (!_parameterMap.containsKey(RESOURCE_GROUP_NAME)) {
-        throw new HelixException("Missing Json paramaters: '" + RESOURCE_GROUP_NAME + "'");
+        throw new HelixException("Missing Json parameters: '" + RESOURCE_GROUP_NAME + "'");
       }
 
       if (!_parameterMap.containsKey(PARTITIONS)) {
-        throw new HelixException("Missing Json paramaters: '" + PARTITIONS + "'");
+        throw new HelixException("Missing Json parameters: '" + PARTITIONS + "'");
       }
 
       if (!_parameterMap.containsKey(STATE_MODEL_DEF_REF)) {
-        throw new HelixException("Missing Json paramaters: '" + STATE_MODEL_DEF_REF + "'");
+        throw new HelixException("Missing Json parameters: '" + STATE_MODEL_DEF_REF + "'");
       }
 
     }
   }
-
-  // temp test
-  public static void main(String[] args) throws Exception {
-    String jsonPayload =
-        "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}";
-    Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload);
-    System.out.println(map);
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
new file mode 100644
index 0000000..969bdf5
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
@@ -0,0 +1,95 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.webapp.RestAdminApplication;
+import org.restlet.Context;
+import org.restlet.Request;
+import org.restlet.data.Form;
+import org.restlet.representation.Representation;
+
+public class ResourceUtil {
+  /**
+   * Key enums for getting values from request
+   */
+  public enum RequestKey {
+    CLUSTER_NAME("clusterName"),
+    JOB_QUEUE("jobQueue"),
+    JOB("job");
+
+    private final String _key;
+
+    RequestKey(String key) {
+      _key = key;
+    }
+
+    public String toString() {
+      return _key;
+    }
+  }
+
+  /**
+   * Key enums for getting values from context
+   */
+  public enum ContextKey {
+    ZK_ADDR(RestAdminApplication.ZKSERVERADDRESS),
+    ZKCLIENT(RestAdminApplication.ZKCLIENT);
+
+    private final String _key;
+
+    ContextKey(String key) {
+      _key = key;
+    }
+
+    public String toString() {
+      return _key;
+    }
+  }
+
+  /**
+   * Key enums for getting yaml format parameters
+   */
+  public enum YamlParamKey {
+    NEW_JOB("newJob");
+
+    private final String _key;
+    YamlParamKey(String key) {
+      _key = key;
+    }
+
+    public String toString() {
+      return _key;
+    }
+  }
+
+  public static String getAttributeFromRequest(Request r, RequestKey key) {
+    return (String) r.getAttributes().get(key.toString());
+  }
+
+
+  public static ZkClient getAttributeFromCtx(Context ctx, ContextKey key) {
+    return (ZkClient) ctx.getAttributes().get(key.toString());
+  }
+
+  public static String getYamlParameters(Form form, YamlParamKey key) {
+    return form.getFirstValue(key.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
index 5b4411b..fdccee9 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
@@ -58,9 +58,10 @@ public class AdminTestBase {
     AssertJUnit.assertTrue(_zkServer != null);
     ZKClientPool.reset();
 
-    _gZkClient = new ZkClient(ZK_ADDR);
-    _gZkClient.setZkSerializer(new ZNRecordSerializer());
-    _gSetupTool = new ClusterSetup(ZK_ADDR);
+    _gZkClient =
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+            ZkClient.DEFAULT_SESSION_TIMEOUT, new ZNRecordSerializer());
+    _gSetupTool = new ClusterSetup(_gZkClient);
 
     // start admin
     _adminThread = new AdminThread(ZK_ADDR, ADMIN_PORT);

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
index 9f6946d..5b371cc 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
@@ -19,9 +19,26 @@ package org.apache.helix.webapp;
  * under the License.
  */
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.helix.ZNRecord;
 import org.apache.helix.webapp.HelixAdminWebApp;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.restlet.Client;
+import org.restlet.Request;
+import org.restlet.Response;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Reference;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.testng.Assert;
 
 public class AdminTestHelper {
 
@@ -66,4 +83,44 @@ public class AdminTestHelper {
     }
   }
 
+  public static ZNRecord get(Client client, String url) throws IOException {
+    Reference resourceRef = new Reference(url);
+    Request request = new Request(Method.GET, resourceRef);
+    Response response = client.handle(request);
+    Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK);
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    String responseStr = sw.toString();
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+    return record;
+  }
+
+  public static ZNRecord post(Client client, String url, String body)
+      throws IOException {
+    Reference resourceRef = new Reference(url);
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(body, MediaType.APPLICATION_ALL);
+
+    Response response = client.handle(request);
+    Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK);
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+
+    if (result != null) {
+      result.write(sw);
+    }
+    String responseStr = sw.toString();
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+    return record;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java
new file mode 100644
index 0000000..6c0e0e1
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java
@@ -0,0 +1,190 @@
+package resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.DummyTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.beans.JobBean;
+import org.apache.helix.task.beans.WorkflowBean;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.webapp.AdminTestBase;
+import org.apache.helix.webapp.AdminTestHelper;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.apache.helix.webapp.resources.ResourceUtil;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.yaml.snakeyaml.Yaml;
+
+public class TestJobQueuesResource extends AdminTestBase {
+  private static final Logger LOG = Logger.getLogger(TestJobQueuesResource.class);
+
+  @Test
+  public void test() throws Exception {
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+    final int p = 20;
+    final int r = 3;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(clusterName, true);
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+    }
+
+    // Set up target db
+    _gSetupTool.addResourceToCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, p,
+        "MasterSlave");
+    _gSetupTool.rebalanceStorageCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, r);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("DummyTask", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new DummyTask(context);
+      }
+    });
+
+    // Start dummy participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(participants[i],
+          taskFactoryReg));
+      participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = "controller";
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, controllerName);
+    controller.syncStart();
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Start a queue
+    String queueName = "myQueue1";
+    LOG.info("Starting job-queue: " + queueName);
+    String jobQueueYamlConfig = "name: " + queueName;
+
+    String resourceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues";
+    ZNRecord postRet = AdminTestHelper.post(_gClient, resourceUrl, jobQueueYamlConfig);
+    LOG.info("Started job-queue: " + queueName + ", ret: " + postRet);
+
+    LOG.info("Getting all job-queues");
+    ZNRecord getRet = AdminTestHelper.get(_gClient, resourceUrl);
+    LOG.info("Got job-queues: " + getRet);
+
+    // Enqueue job
+    resourceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName;
+
+    WorkflowBean wfBean = new WorkflowBean();
+    wfBean.name = queueName;
+    JobBean jBean = new JobBean();
+    jBean.name = "myJob1";
+    jBean.command = "DummyTask";
+    jBean.targetResource = WorkflowGenerator.DEFAULT_TGT_DB;
+    jBean.targetPartitionStates = Lists.newArrayList("MASTER");
+    wfBean.jobs = Lists.newArrayList(jBean);
+    String jobYamlConfig = new Yaml().dump(wfBean);
+    LOG.info("Enqueuing a job: " + jobQueueYamlConfig);
+
+    Map<String, String> paraMap = new HashMap<String, String>();
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.start.toString());
+
+    String postBody =
+        String.format("%s=%s&%s=%s", JsonParameters.JSON_PARAMETERS,
+            ClusterRepresentationUtil.ObjectToJson(paraMap), ResourceUtil.YamlParamKey.NEW_JOB.toString(),
+            jobYamlConfig);
+    postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+    LOG.info("Enqueued job, ret: " + postRet);
+
+    // Get job
+    resourceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName
+            + "/" + jBean.name;
+    getRet = AdminTestHelper.get(_gClient, resourceUrl);
+    LOG.info("Got job: " + getRet);
+
+    // Stop job queue
+    resourceUrl =
+            "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName;
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.stop.toString());
+    postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap));
+    postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+    LOG.info("Stopped job-queue, ret: " + postRet);
+
+    // Resume job queue
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.resume.toString());
+    postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap));
+    postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+    LOG.info("Resumed job-queue, ret: " + postRet);
+
+    // Flush job queue
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, "flush");
+    postBody =
+        JsonParameters.JSON_PARAMETERS + "=" + ClusterRepresentationUtil.ObjectToJson(paraMap);
+    postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+    LOG.info("Flushed job-queue, ret: " + postRet);
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      if (participants[i] != null && participants[i].isConnected()) {
+        participants[i].syncStop();
+      }
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java b/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java
new file mode 100644
index 0000000..383ac21
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java
@@ -0,0 +1,44 @@
+package resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestJsonParameters {
+  @Test
+  public void test() throws Exception {
+    String jsonPayload =
+        "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}";
+    Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload);
+    Assert.assertNotNull(map.get(JsonParameters.MANAGEMENT_COMMAND));
+    Assert.assertEquals(ClusterSetup.resetPartition, map.get(JsonParameters.MANAGEMENT_COMMAND));
+    Assert.assertNotNull(map.get(JsonParameters.RESOURCE));
+    Assert.assertEquals("DB-1", map.get(JsonParameters.RESOURCE));
+    Assert.assertNotNull(map.get(JsonParameters.PARTITION));
+    Assert.assertEquals("DB-1_22 DB-1_23", map.get(JsonParameters.PARTITION));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 6a1fb72..3328279 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -792,7 +792,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
        */
       _disconnectTimeHistory.add(System.currentTimeMillis());
       if (isFlapping()) {
-        LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
+        LOG.error("instanceName: " + _instanceName + " is flapping. disconnect it. "
             + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
             + _flappingTimeWindowMs + "ms.");
         disconnect();

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0bd060a..60dc22c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -90,12 +90,13 @@ public class TaskDriver {
   private final String _clusterName;
 
   /** Commands which may be parsed from the first argument to main */
-  private enum DriverCommand {
+  public enum DriverCommand {
     start,
     stop,
     delete,
     resume,
-    list
+    list,
+    flush
   }
 
   public TaskDriver(HelixManager manager) {
@@ -166,6 +167,10 @@ public class TaskDriver {
         break;
       case list:
         driver.list(resource);
+        break;
+      case flush:
+        driver.flushQueue(resource);
+        break;
       default:
         throw new IllegalArgumentException("Unknown command " + args[0]);
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 1c0ef40..4ca6e68 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -132,7 +132,7 @@ public class Workflow {
     WorkflowBean wf = (WorkflowBean) yaml.load(reader);
     Builder builder = new Builder(wf.name);
 
-    if (wf != null) {
+    if (wf != null && wf.jobs != null) {
       for (JobBean job : wf.jobs) {
         if (job.name == null) {
           throw new IllegalArgumentException("A job must have a name.");

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index df0c74c..92c1d7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -255,7 +255,7 @@ under the License.
       <dependency>
         <groupId>org.restlet.jse</groupId>
         <artifactId>org.restlet</artifactId>
-        <version>2.2.1</version>
+        <version>2.2.3</version>
       </dependency>
       <dependency>
         <groupId>org.apache.helix</groupId>


Mime
View raw message