helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [34/50] [abbrv] helix git commit: HELIX-661: implement GET namespace(s)
Date Thu, 25 Jan 2018 21:49:25 GMT
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
new file mode 100644
index 0000000..1eac9c2
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -0,0 +1,79 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.rest.common.ContextPropertyKeys;
+import org.apache.helix.rest.server.ServerContext;
+import org.apache.helix.rest.server.resources.AbstractResource;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.tools.ClusterSetup;
+
+
+/**
+ * This class provides methods to access Helix specific objects
+ * such as cluster, instance, job, resource, workflow, etc in
+ * metadata store.
+ */
+public class AbstractHelixResource extends AbstractResource{
+
+  public ZkClient getZkClient() {
+    ServerContext serverContext = getServerContext();
+    return serverContext.getZkClient();
+  }
+
+  public HelixAdmin getHelixAdmin() {
+    ServerContext serverContext = getServerContext();
+    return serverContext.getHelixAdmin();
+  }
+
+  public ClusterSetup getClusterSetup() {
+    ServerContext serverContext = getServerContext();
+    return serverContext.getClusterSetup();
+  }
+
+  public TaskDriver getTaskDriver(String clusterName) {
+    ServerContext serverContext = getServerContext();
+    return serverContext.getTaskDriver(clusterName);
+  }
+
+  public ConfigAccessor getConfigAccessor() {
+    ServerContext serverContext = getServerContext();
+    return serverContext.getConfigAccessor();
+  }
+
+  public HelixDataAccessor getDataAccssor(String clusterName) {
+    ServerContext serverContext = getServerContext();
+    return serverContext.getDataAccssor(clusterName);
+  }
+
+  protected static ZNRecord toZNRecord(String data) throws IOException {
+    return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
+  }
+
+  private ServerContext getServerContext() {
+    return (ServerContext) _application.getProperties().get(ContextPropertyKeys.SERVER_CONTEXT.name());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
new file mode 100644
index 0000000..f6f95b0
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -0,0 +1,400 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.LeaderHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterSetup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/clusters")
+public class ClusterAccessor extends AbstractHelixResource {
+  private static Logger _logger = LoggerFactory.getLogger(ClusterAccessor.class.getName());
+
+  public enum ClusterProperties {
+    controller,
+    instances,
+    liveInstances,
+    resources,
+    paused,
+    maintenance,
+    messages,
+    stateModelDefinitions,
+    clusters
+  }
+
+  @GET
+  public Response getClusters() {
+    HelixAdmin helixAdmin = getHelixAdmin();
+    List<String> clusters = helixAdmin.getClusters();
+
+    Map<String, List<String>> dataMap = new HashMap<>();
+    dataMap.put(ClusterProperties.clusters.name(), clusters);
+
+    return JSONRepresentation(dataMap);
+  }
+
+  @GET
+  @Path("{clusterId}")
+  public Response getClusterInfo(@PathParam("clusterId") String clusterId) {
+    if (!isClusterExist(clusterId)) {
+      return notFound();
+    }
+
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    Map<String, Object> clusterInfo = new HashMap<>();
+    clusterInfo.put(Properties.id.name(), clusterId);
+
+    LiveInstance controller = dataAccessor.getProperty(keyBuilder.controllerLeader());
+    if (controller != null) {
+      clusterInfo.put(ClusterProperties.controller.name(), controller.getInstanceName());
+    } else {
+      clusterInfo.put(ClusterProperties.controller.name(), "No Lead Controller!");
+    }
+
+    boolean paused = (dataAccessor.getProperty(keyBuilder.pause()) == null ? false : true);
+    clusterInfo.put(ClusterProperties.paused.name(), paused);
+    boolean maintenance =
+        (dataAccessor.getProperty(keyBuilder.maintenance()) == null ? false : true);
+    clusterInfo.put(ClusterProperties.maintenance.name(), maintenance);
+
+    List<String> idealStates = dataAccessor.getChildNames(keyBuilder.idealStates());
+    clusterInfo.put(ClusterProperties.resources.name(), idealStates);
+    List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
+    clusterInfo.put(ClusterProperties.instances.name(), instances);
+    List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
+    clusterInfo.put(ClusterProperties.liveInstances.name(), liveInstances);
+
+    return JSONRepresentation(clusterInfo);
+  }
+
+
+  @PUT
+  @Path("{clusterId}")
+  public Response createCluster(@PathParam("clusterId") String clusterId,
+      @DefaultValue("false") @QueryParam("recreate") String recreate) {
+    boolean recreateIfExists = Boolean.valueOf(recreate);
+    ClusterSetup clusterSetup = getClusterSetup();
+
+    try {
+      clusterSetup.addCluster(clusterId, recreateIfExists);
+    } catch (Exception ex) {
+      _logger.error("Failed to create cluster " + clusterId + ", exception: " + ex);
+      return serverError(ex);
+    }
+
+    return created();
+  }
+
+  @DELETE
+  @Path("{clusterId}")
+  public Response deleteCluster(@PathParam("clusterId") String clusterId) {
+    ClusterSetup clusterSetup = getClusterSetup();
+
+    try {
+      clusterSetup.deleteCluster(clusterId);
+    } catch (HelixException ex) {
+      _logger.info(
+          "Failed to delete cluster " + clusterId + ", cluster is still in use. Exception: " + ex);
+      return badRequest(ex.getMessage());
+    } catch (Exception ex) {
+      _logger.error("Failed to delete cluster " + clusterId + ", exception: " + ex);
+      return serverError(ex);
+    }
+
+    return OK();
+  }
+
+  @POST
+  @Path("{clusterId}")
+  public Response updateCluster(@PathParam("clusterId") String clusterId,
+      @QueryParam("command") String commandStr, @QueryParam("superCluster") String superCluster,
+      String content) {
+    Command command;
+    try {
+      command = getCommand(commandStr);
+    } catch (HelixException ex) {
+      return badRequest(ex.getMessage());
+    }
+
+    ClusterSetup clusterSetup = getClusterSetup();
+    HelixAdmin helixAdmin = getHelixAdmin();
+
+    switch (command) {
+    case activate:
+      if (superCluster == null) {
+        return badRequest("Super Cluster name is missing!");
+      }
+      try {
+        clusterSetup.activateCluster(clusterId, superCluster, true);
+      } catch (Exception ex) {
+        _logger.error("Failed to add cluster " + clusterId + " to super cluster " + superCluster);
+        return serverError(ex);
+      }
+      break;
+
+    case expand:
+      try {
+        clusterSetup.expandCluster(clusterId);
+      } catch (Exception ex) {
+        _logger.error("Failed to expand cluster " + clusterId);
+        return serverError(ex);
+      }
+      break;
+
+    case enable:
+      try {
+        helixAdmin.enableCluster(clusterId, true);
+      } catch (Exception ex) {
+        _logger.error("Failed to enable cluster " + clusterId);
+        return serverError(ex);
+      }
+      break;
+
+    case disable:
+      try {
+        helixAdmin.enableCluster(clusterId, false);
+      } catch (Exception ex) {
+        _logger.error("Failed to disable cluster " + clusterId);
+        return serverError(ex);
+      }
+      break;
+    case enableMaintenanceMode:
+      try {
+        helixAdmin.enableMaintenanceMode(clusterId, true, content);
+      } catch (Exception ex) {
+        _logger.error("Failed to enable maintenance mode " + clusterId);
+        return serverError(ex);
+      }
+      break;
+    case disableMaintenanceMode:
+      try {
+        helixAdmin.enableMaintenanceMode(clusterId, false);
+      } catch (Exception ex) {
+        _logger.error("Failed to disable maintenance mode " + clusterId);
+        return serverError(ex);
+      }
+      break;
+    default:
+      return badRequest("Unsupported command " + command);
+    }
+
+    return OK();
+  }
+
+
+  @GET
+  @Path("{clusterId}/configs")
+  public Response getClusterConfig(@PathParam("clusterId") String clusterId) {
+    ConfigAccessor accessor = getConfigAccessor();
+    ClusterConfig config = null;
+    try {
+      config = accessor.getClusterConfig(clusterId);
+    } catch (HelixException ex) {
+      // cluster not found.
+      _logger.info("Failed to get cluster config for cluster " + clusterId
+          + ", cluster not found, Exception: " + ex);
+    } catch (Exception ex) {
+      _logger.error("Failed to get cluster config for cluster " + clusterId + " Exception: " + ex);
+      return serverError(ex);
+    }
+    if (config == null) {
+      return notFound();
+    }
+    return JSONRepresentation(config.getRecord());
+  }
+
+  @POST
+  @Path("{clusterId}/configs")
+  public Response updateClusterConfig(
+      @PathParam("clusterId") String clusterId, @QueryParam("command") String commandStr,
+      String content) {
+    Command command;
+    try {
+      command = getCommand(commandStr);
+    } catch (HelixException ex) {
+      return badRequest(ex.getMessage());
+    }
+
+    ZNRecord record;
+    try {
+      record = toZNRecord(content);
+    } catch (IOException e) {
+      _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+      return badRequest("Input is not a valid ZNRecord!");
+    }
+
+    if (!record.getId().equals(clusterId)) {
+      return badRequest("ID does not match the cluster name in input!");
+    }
+
+    ClusterConfig config = new ClusterConfig(record);
+    ConfigAccessor configAccessor = getConfigAccessor();
+    try {
+      switch (command) {
+      case update:
+        configAccessor.updateClusterConfig(clusterId, config);
+        break;
+      case delete: {
+        HelixConfigScope clusterScope =
+            new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+                .forCluster(clusterId).build();
+        configAccessor.remove(clusterScope, config.getRecord());
+        }
+        break;
+
+      default:
+        return badRequest("Unsupported command " + commandStr);
+      }
+    } catch (HelixException ex) {
+      return notFound(ex.getMessage());
+    } catch (Exception ex) {
+      _logger.error(
+          "Failed to " + command + " cluster config, cluster " + clusterId + " new config: "
+              + content + ", Exception: " + ex);
+      return serverError(ex);
+    }
+    return OK();
+  }
+
+  @GET
+  @Path("{clusterId}/controller")
+  public Response getClusterController(@PathParam("clusterId") String clusterId) {
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    Map<String, Object> controllerInfo = new HashMap<>();
+    controllerInfo.put(Properties.id.name(), clusterId);
+
+    LiveInstance leader = dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeader());
+    if (leader != null) {
+      controllerInfo.put(ClusterProperties.controller.name(), leader.getInstanceName());
+      controllerInfo.putAll(leader.getRecord().getSimpleFields());
+    } else {
+      controllerInfo.put(ClusterProperties.controller.name(), "No Lead Controller!");
+    }
+
+    return JSONRepresentation(controllerInfo);
+  }
+
+  @GET
+  @Path("{clusterId}/controller/history")
+  public Response getClusterControllerHistory(@PathParam("clusterId") String clusterId) {
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    Map<String, Object> controllerHistory = new HashMap<>();
+    controllerHistory.put(Properties.id.name(), clusterId);
+
+    LeaderHistory history =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeaderHistory());
+    if (history != null) {
+      controllerHistory.put(Properties.history.name(), history.getHistoryList());
+    } else {
+      controllerHistory.put(Properties.history.name(), Collections.emptyList());
+    }
+
+    return JSONRepresentation(controllerHistory);
+  }
+
+  @GET
+  @Path("{clusterId}/controller/messages")
+  public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId) {
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+
+    Map<String, Object> controllerMessages = new HashMap<>();
+    controllerMessages.put(Properties.id.name(), clusterId);
+
+    List<String> messages =
+        dataAccessor.getChildNames(dataAccessor.keyBuilder().controllerMessages());
+    controllerMessages.put(ClusterProperties.messages.name(), messages);
+    controllerMessages.put(Properties.count.name(), messages.size());
+
+    return JSONRepresentation(controllerMessages);
+  }
+
+  @GET
+  @Path("{clusterId}/controller/messages/{messageId}")
+  public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId, @PathParam("messageId") String messageId) {
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    Message message = dataAccessor.getProperty(
+        dataAccessor.keyBuilder().controllerMessage(messageId));
+    return JSONRepresentation(message.getRecord());
+  }
+
+  @GET
+  @Path("{clusterId}/statemodeldefs")
+  public Response getClusterStateModelDefinitions(@PathParam("clusterId") String clusterId) {
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    List<String> stateModelDefs =
+        dataAccessor.getChildNames(dataAccessor.keyBuilder().stateModelDefs());
+
+    Map<String, Object> clusterStateModelDefs = new HashMap<>();
+    clusterStateModelDefs.put(Properties.id.name(), clusterId);
+    clusterStateModelDefs.put(ClusterProperties.stateModelDefinitions.name(), stateModelDefs);
+
+    return JSONRepresentation(clusterStateModelDefs);
+  }
+
+  @GET
+  @Path("{clusterId}/statemodeldefs/{statemodel}")
+  public Response getClusterStateModelDefinition(@PathParam("clusterId") String clusterId,
+      @PathParam("statemodel") String statemodel) {
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    StateModelDefinition stateModelDef =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().stateModelDef(statemodel));
+
+    return JSONRepresentation(stateModelDef.getRecord());
+  }
+
+  private boolean isClusterExist(String cluster) {
+    ZkClient zkClient = getZkClient();
+    if (ZKUtil.isClusterSetup(cluster, zkClient)) {
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
new file mode 100644
index 0000000..2748dea
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
@@ -0,0 +1,545 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Error;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ParticipantHistory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+
+@Path("/clusters/{clusterId}/instances")
+public class InstanceAccessor extends AbstractHelixResource {
+  private final static Logger _logger = LoggerFactory.getLogger(InstanceAccessor.class);
+
+  public enum InstanceProperties {
+    instances,
+    online,
+    disabled,
+    config,
+    liveInstance,
+    resource,
+    resources,
+    partitions,
+    errors,
+    new_messages,
+    read_messages,
+    total_message_count,
+    read_message_count,
+    healthreports,
+    instanceTags
+  }
+
+  @GET
+  public Response getInstances(@PathParam("clusterId") String clusterId) {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+    root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
+
+    ArrayNode instancesNode = root.putArray(InstanceProperties.instances.name());
+    ArrayNode onlineNode = root.putArray(InstanceProperties.online.name());
+    ArrayNode disabledNode = root.putArray(InstanceProperties.disabled.name());
+
+    List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs());
+
+    if (instances != null) {
+      instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
+    } else {
+      return notFound();
+    }
+
+    List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
+    ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+
+    for (String instanceName : instances) {
+      InstanceConfig instanceConfig =
+          accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
+      if (instanceConfig != null) {
+        if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
+            && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
+          disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
+        }
+
+        if (liveInstances.contains(instanceName)){
+          onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
+        }
+      }
+    }
+
+    return JSONRepresentation(root);
+  }
+
+  @POST
+  public Response updateInstances(@PathParam("clusterId") String clusterId,
+      @QueryParam("command") String command, String content) {
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
+
+    HelixAdmin admin = getHelixAdmin();
+    try {
+      JsonNode node = null;
+      if (content.length() != 0) {
+        node = OBJECT_MAPPER.readTree(content);
+      }
+      if (node == null) {
+        return badRequest("Invalid input for content : " + content);
+      }
+      List<String> enableInstances = OBJECT_MAPPER
+          .readValue(node.get(InstanceProperties.instances.name()).toString(),
+              OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
+      switch (cmd) {
+      case enable:
+        admin.enableInstance(clusterId, enableInstances, true);
+
+        break;
+      case disable:
+        admin.enableInstance(clusterId, enableInstances, false);
+        break;
+      default:
+        _logger.error("Unsupported command :" + command);
+        return badRequest("Unsupported command :" + command);
+      }
+    } catch (Exception e) {
+      _logger.error("Failed in updating instances : " + content, e);
+      return badRequest(e.getMessage());
+    }
+    return OK();
+  }
+
+  @GET
+  @Path("{instanceName}")
+  public Response getInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    Map<String, Object> instanceMap = new HashMap<>();
+    instanceMap.put(Properties.id.name(), JsonNodeFactory.instance.textNode(instanceName));
+    instanceMap.put(InstanceProperties.liveInstance.name(), null);
+
+    InstanceConfig instanceConfig =
+        accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
+    LiveInstance liveInstance =
+        accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
+
+    if (instanceConfig != null) {
+      instanceMap.put(InstanceProperties.config.name(), instanceConfig.getRecord());
+    } else {
+      return notFound();
+    }
+
+    if (liveInstance != null) {
+      instanceMap.put(InstanceProperties.liveInstance.name(), liveInstance.getRecord());
+    }
+
+    return JSONRepresentation(instanceMap);
+  }
+
+  @PUT
+  @Path("{instanceName}")
+  public Response addInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName, String content) {
+    HelixAdmin admin = getHelixAdmin();
+    ZNRecord record;
+    try {
+      record = toZNRecord(content);
+    } catch (IOException e) {
+      _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+      return badRequest("Input is not a vaild ZNRecord!");
+    }
+
+    try {
+      admin.addInstance(clusterId, new InstanceConfig(record));
+    } catch (Exception ex) {
+      _logger.error("Error in adding an instance: " + instanceName, ex);
+      return serverError(ex);
+    }
+
+    return OK();
+  }
+
+  @POST
+  @Path("{instanceName}")
+  public Response updateInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName, @QueryParam("command") String command,
+      String content) {
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
+
+    HelixAdmin admin = getHelixAdmin();
+    try {
+      JsonNode node = null;
+      if (content.length() != 0) {
+        node = OBJECT_MAPPER.readTree(content);
+      }
+
+      switch (cmd) {
+      case enable:
+        admin.enableInstance(clusterId, instanceName, true);
+        break;
+      case disable:
+        admin.enableInstance(clusterId, instanceName, false);
+        break;
+      case reset:
+        if (!validInstance(node, instanceName)) {
+          return badRequest("Instance names are not match!");
+        }
+        admin.resetPartition(clusterId, instanceName,
+            node.get(InstanceProperties.resource.name()).toString(), (List<String>) OBJECT_MAPPER
+                .readValue(node.get(InstanceProperties.partitions.name()).toString(),
+                    OBJECT_MAPPER.getTypeFactory()
+                        .constructCollectionType(List.class, String.class)));
+        break;
+      case addInstanceTag:
+        if (!validInstance(node, instanceName)) {
+          return badRequest("Instance names are not match!");
+        }
+        for (String tag : (List<String>) OBJECT_MAPPER
+            .readValue(node.get(InstanceProperties.instanceTags.name()).toString(),
+                OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) {
+          admin.addInstanceTag(clusterId, instanceName, tag);
+        }
+        break;
+      case removeInstanceTag:
+        if (!validInstance(node, instanceName)) {
+          return badRequest("Instance names are not match!");
+        }
+        for (String tag : (List<String>) OBJECT_MAPPER
+            .readValue(node.get(InstanceProperties.instanceTags.name()).toString(),
+                OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) {
+          admin.removeInstanceTag(clusterId, instanceName, tag);
+        }
+        break;
+      case enablePartitions:
+        admin.enablePartition(true, clusterId, instanceName,
+            node.get(InstanceProperties.resource.name()).getTextValue(),
+            (List<String>) OBJECT_MAPPER
+                .readValue(node.get(InstanceProperties.partitions.name()).toString(),
+                    OBJECT_MAPPER.getTypeFactory()
+                        .constructCollectionType(List.class, String.class)));
+        break;
+      case disablePartitions:
+        admin.enablePartition(false, clusterId, instanceName,
+            node.get(InstanceProperties.resource.name()).getTextValue(),
+            (List<String>) OBJECT_MAPPER
+                .readValue(node.get(InstanceProperties.partitions.name()).toString(),
+                    OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)));
+        break;
+      default:
+        _logger.error("Unsupported command :" + command);
+        return badRequest("Unsupported command :" + command);
+      }
+    } catch (Exception e) {
+      _logger.error("Failed in updating instance : " + instanceName, e);
+      return badRequest(e.getMessage());
+    }
+    return OK();
+  }
+
+  @DELETE
+  @Path("{instanceName}")
+  public Response deleteInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) {
+    HelixAdmin admin = getHelixAdmin();
+    try {
+      InstanceConfig instanceConfig = admin.getInstanceConfig(clusterId, instanceName);
+      admin.dropInstance(clusterId, instanceConfig);
+    } catch (HelixException e) {
+      return badRequest(e.getMessage());
+    }
+
+    return OK();
+  }
+
+  @GET
+  @Path("{instanceName}/configs")
+  public Response getInstanceConfig(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    InstanceConfig instanceConfig =
+        accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
+
+    if (instanceConfig != null) {
+      return JSONRepresentation(instanceConfig.getRecord());
+    }
+
+    return notFound();
+  }
+
+  @PUT
+  @Path("{instanceName}/configs")
+  public Response updateInstanceConfig(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName, String content) throws IOException {
+    HelixAdmin admin = getHelixAdmin();
+    ZNRecord record;
+    try {
+      record = toZNRecord(content);
+    } catch (IOException e) {
+      _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+      return badRequest("Input is not a vaild ZNRecord!");
+    }
+
+    try {
+      admin.setInstanceConfig(clusterId, instanceName, new InstanceConfig(record));
+    } catch (Exception ex) {
+      _logger.error("Error in update instance config: " + instanceName, ex);
+      return serverError(ex);
+    }
+
+    return OK();
+  }
+
+  @GET
+  @Path("{instanceName}/resources")
+  public Response getResourcesOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+    root.put(Properties.id.name(), instanceName);
+    ArrayNode resourcesNode = root.putArray(InstanceProperties.resources.name());
+
+    List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName));
+    if (sessionIds == null || sessionIds.size() == 0) {
+      return null;
+    }
+
+    // Only get resource list from current session id
+    String currentSessionId = sessionIds.get(0);
+
+    List<String> resources =
+        accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, currentSessionId));
+    if (resources != null && resources.size() > 0) {
+      resourcesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(resources));
+    }
+
+    return JSONRepresentation(root);
+  }
+
+  @GET
+  @Path("{instanceName}/resources/{resourceName}")
+  public Response getResourceOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName,
+      @PathParam("resourceName") String resourceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName));
+    if (sessionIds == null || sessionIds.size() == 0) {
+      return notFound();
+    }
+
+    // Only get resource list from current session id
+    String currentSessionId = sessionIds.get(0);
+    CurrentState resourceCurrentState = accessor
+        .getProperty(accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName));
+    if (resourceCurrentState != null) {
+      return JSONRepresentation(resourceCurrentState.getRecord());
+    }
+
+    return notFound();
+  }
+
+  @GET
+  @Path("{instanceName}/errors")
+  public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+    root.put(Properties.id.name(), instanceName);
+    ObjectNode errorsNode = JsonNodeFactory.instance.objectNode();
+
+    List<String> sessionIds =
+        accessor.getChildNames(accessor.keyBuilder().errors(instanceName));
+
+    if (sessionIds == null || sessionIds.size() == 0) {
+      return notFound();
+    }
+
+    for (String sessionId : sessionIds) {
+      List<String> resources =
+          accessor.getChildNames(accessor.keyBuilder().errors(instanceName, sessionId));
+      if (resources != null) {
+        ObjectNode resourcesNode = JsonNodeFactory.instance.objectNode();
+        for (String resourceName : resources) {
+          List<String> partitions = accessor
+              .getChildNames(accessor.keyBuilder().errors(instanceName, sessionId, resourceName));
+          if (partitions != null) {
+            ArrayNode partitionsNode = resourcesNode.putArray(resourceName);
+            partitionsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(partitions));
+          }
+        }
+        errorsNode.put(sessionId, resourcesNode);
+      }
+    }
+    root.put(InstanceProperties.errors.name(), errorsNode);
+
+    return JSONRepresentation(root);
+  }
+
+  @GET
+  @Path("{instanceName}/errors/{sessionId}/{resourceName}/{partitionName}")
+  public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName, @PathParam("sessionId") String sessionId,
+      @PathParam("resourceName") String resourceName,
+      @PathParam("partitionName") String partitionName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    Error error = accessor.getProperty(accessor.keyBuilder()
+        .stateTransitionError(instanceName, sessionId, resourceName, partitionName));
+    if (error != null) {
+      return JSONRepresentation(error.getRecord());
+    }
+
+    return notFound();
+  }
+
+  @GET
+  @Path("{instanceName}/history")
+  public Response getHistoryOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    ParticipantHistory history =
+        accessor.getProperty(accessor.keyBuilder().participantHistory(instanceName));
+    if (history != null) {
+      return JSONRepresentation(history.getRecord());
+    }
+    return notFound();
+  }
+
+  @GET
+  @Path("{instanceName}/messages")
+  public Response getMessagesOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+    root.put(Properties.id.name(), instanceName);
+    ArrayNode newMessages = root.putArray(InstanceProperties.new_messages.name());
+    ArrayNode readMessages = root.putArray(InstanceProperties.read_messages.name());
+
+
+    List<String> messages =
+        accessor.getChildNames(accessor.keyBuilder().messages(instanceName));
+    if (messages == null || messages.size() == 0) {
+      return notFound();
+    }
+
+    for (String messageName : messages) {
+      Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageName));
+      if (message.getMsgState() == Message.MessageState.NEW) {
+        newMessages.add(messageName);
+      }
+
+      if (message.getMsgState() == Message.MessageState.READ) {
+        readMessages.add(messageName);
+      }
+    }
+
+    root.put(InstanceProperties.total_message_count.name(),
+        newMessages.size() + readMessages.size());
+    root.put(InstanceProperties.read_message_count.name(), readMessages.size());
+
+    return JSONRepresentation(root);
+  }
+
+  @GET
+  @Path("{instanceName}/messages/{messageId}")
+  public Response getMessageOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName,
+      @PathParam("messageId") String messageId) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageId));
+    if (message != null) {
+      return JSONRepresentation(message.getRecord());
+    }
+
+    return notFound();
+  }
+
+  @GET
+  @Path("{instanceName}/healthreports")
+  public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+    root.put(Properties.id.name(), instanceName);
+    ArrayNode healthReportsNode = root.putArray(InstanceProperties.healthreports.name());
+
+    List<String> healthReports =
+        accessor.getChildNames(accessor.keyBuilder().healthReports(instanceName));
+
+    if (healthReports != null && healthReports.size() > 0) {
+      healthReportsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(healthReports));
+    }
+
+    return JSONRepresentation(root);
+  }
+
+  @GET
+  @Path("{instanceName}/healthreports/{reportName}")
+  public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName,
+      @PathParam("reportName") String reportName) throws IOException {
+    HelixDataAccessor accessor = getDataAccssor(clusterId);
+    HealthStat healthStat =
+        accessor.getProperty(accessor.keyBuilder().healthReport(instanceName, reportName));
+    if (healthStat != null) {
+      return JSONRepresentation(healthStat);
+    }
+
+    return notFound();
+  }
+
+  private boolean validInstance(JsonNode node, String instanceName) {
+    return instanceName.equals(node.get(Properties.id.name()).getValueAsText());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
new file mode 100644
index 0000000..2d27f51
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
@@ -0,0 +1,200 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+
+@Path("/clusters/{clusterId}/workflows/{workflowName}/jobs")
+public class JobAccessor extends AbstractHelixResource {
+  private static Logger _logger = LoggerFactory.getLogger(JobAccessor.class.getName());
+
+  public enum JobProperties {
+    Jobs,
+    JobConfig,
+    JobContext,
+    TASK_COMMAND
+  }
+
+  @GET
+  public Response getJobs(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName) {
+    TaskDriver driver = getTaskDriver(clusterId);
+    WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName);
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+
+    if (workflowConfig == null) {
+      return badRequest(String.format("Workflow %s is not found!", workflowName));
+    }
+
+    Set<String> jobs = workflowConfig.getJobDag().getAllNodes();
+    root.put(Properties.id.name(), JobProperties.Jobs.name());
+    ArrayNode jobsNode = root.putArray(JobProperties.Jobs.name());
+
+    if (jobs != null) {
+      jobsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(jobs));
+    }
+    return JSONRepresentation(root);
+  }
+
+  @GET
+  @Path("{jobName}")
+  public Response getJob(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
+    TaskDriver driver = getTaskDriver(clusterId);
+    Map<String, ZNRecord> jobMap = new HashMap<>();
+
+
+    JobConfig jobConfig = driver.getJobConfig(jobName);
+    if (jobConfig != null) {
+      jobMap.put(JobProperties.JobConfig.name(), jobConfig.getRecord());
+    } else {
+      return badRequest(String.format("Job config for %s does not exists", jobName));
+    }
+
+    JobContext jobContext =
+        driver.getJobContext(jobName);
+    jobMap.put(JobProperties.JobContext.name(), null);
+
+    if (jobContext != null) {
+      jobMap.put(JobProperties.JobContext.name(), jobContext.getRecord());
+    }
+
+    return JSONRepresentation(jobMap);
+  }
+
+  @PUT
+  @Path("{jobName}")
+  public Response addJob(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName,
+      String content) {
+    ZNRecord record;
+    TaskDriver driver = getTaskDriver(clusterId);
+
+    try {
+      record = toZNRecord(content);
+      JobConfig.Builder jobConfig = JobAccessor.getJobConfig(record);
+      driver.enqueueJob(workflowName, jobName, jobConfig);
+    } catch (HelixException e) {
+      return badRequest(
+          String.format("Failed to enqueue job %s for reason : %s", jobName, e.getMessage()));
+    } catch (IOException e) {
+      return badRequest(String.format("Invalid input for Job Config of Job : %s", jobName));
+    }
+
+    return OK();
+  }
+
+  @DELETE
+  @Path("{jobName}")
+  public Response deleteJob(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
+    TaskDriver driver = getTaskDriver(clusterId);
+
+    try {
+      driver.deleteJob(workflowName, jobName);
+    } catch (Exception e) {
+      return badRequest(e.getMessage());
+    }
+
+    return OK();
+  }
+
+  @GET
+  @Path("{jobName}/configs")
+  public Response getJobConfig(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
+    TaskDriver driver = getTaskDriver(clusterId);
+
+    JobConfig jobConfig = driver.getJobConfig(jobName);
+    if (jobConfig != null) {
+      return JSONRepresentation(jobConfig.getRecord());
+    }
+    return badRequest("Job config for " + jobName + " does not exists");
+  }
+
+  @GET
+  @Path("{jobName}/context")
+  public Response getJobContext(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
+    TaskDriver driver = getTaskDriver(clusterId);
+
+    JobContext jobContext =
+        driver.getJobContext(jobName);
+    if (jobContext != null) {
+      return JSONRepresentation(jobContext.getRecord());
+    }
+    return badRequest("Job context for " + jobName + " does not exists");
+  }
+
+  protected static JobConfig.Builder getJobConfig(Map<String, String> cfgMap) {
+    return new JobConfig.Builder().fromMap(cfgMap);
+  }
+
+  protected static JobConfig.Builder getJobConfig(ZNRecord record) {
+    JobConfig.Builder jobConfig = new JobConfig.Builder().fromMap(record.getSimpleFields());
+    jobConfig.addTaskConfigMap(getTaskConfigMap(record.getMapFields()));
+
+    return jobConfig;
+  }
+
+  private static Map<String, TaskConfig> getTaskConfigMap(
+      Map<String, Map<String, String>> taskConfigs) {
+    Map<String, TaskConfig> taskConfigsMap = new HashMap<>();
+    if (taskConfigs == null || taskConfigs.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    for (Map<String, String> taskConfigMap : taskConfigs.values()) {
+      if (!taskConfigMap.containsKey(JobProperties.TASK_COMMAND.name())) {
+        continue;
+      }
+
+      TaskConfig taskConfig =
+          new TaskConfig(taskConfigMap.get(JobProperties.TASK_COMMAND.name()), taskConfigMap);
+      taskConfigsMap.put(taskConfig.getId(), taskConfig);
+    }
+
+    return taskConfigsMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java
new file mode 100644
index 0000000..5757760
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java
@@ -0,0 +1,45 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+import org.apache.helix.rest.common.ContextPropertyKeys;
+import org.apache.helix.rest.common.HelixRestNamespace;
+import org.apache.helix.rest.common.HelixRestUtils;
+import org.apache.helix.rest.server.resources.AbstractResource;
+
+@Path("")
+public class MetadataAccessor extends AbstractResource {
+  @GET
+  public Response getMetadata() {
+    if (HelixRestUtils.isDefaultServlet(_servletRequest.getServletPath())) {
+      // To keep API endpoints to behave the same, if user call /admin/v2/ ,
+      // we will return NotFound
+      return notFound();
+    }
+    // This will be the root of all namespaced servlets, and returns
+    // servlet namespace information
+    HelixRestNamespace namespace = (HelixRestNamespace) _application.getProperties().get(
+        ContextPropertyKeys.METADATA.name());
+    return JSONRepresentation(namespace.getRestInfo());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
new file mode 100644
index 0000000..04d3536
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -0,0 +1,278 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+
+@Path("/clusters/{clusterId}/resources")
+public class ResourceAccessor extends AbstractHelixResource {
+  private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class);
+  public enum ResourceProperties {
+    idealState,
+    idealStates,
+    externalView,
+    externalViews,
+    resourceConfig,
+  }
+
+  @GET
+  public Response getResources(@PathParam("clusterId") String clusterId) {
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+    root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
+
+    ZkClient zkClient = getZkClient();
+
+    ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name());
+    ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name());
+
+    List<String> idealStates = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
+    List<String> externalViews = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
+
+    if (idealStates != null) {
+      idealStatesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(idealStates));
+    } else {
+      return notFound();
+    }
+
+    if (externalViews != null) {
+      externalViewsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(externalViews));
+    }
+
+    return JSONRepresentation(root);
+  }
+
+  @GET
+  @Path("{resourceName}")
+  public Response getResource(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName) throws IOException {
+    ConfigAccessor accessor = getConfigAccessor();
+    HelixAdmin admin = getHelixAdmin();
+
+    ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
+    IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
+    ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
+
+    Map<String, ZNRecord> resourceMap = new HashMap<>();
+    if (idealState != null) {
+      resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord());
+    } else {
+      return notFound();
+    }
+
+    resourceMap.put(ResourceProperties.resourceConfig.name(), null);
+    resourceMap.put(ResourceProperties.externalView.name(), null);
+
+    if (resourceConfig != null) {
+      resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
+    }
+
+    if (externalView != null) {
+      resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord());
+    }
+
+    return JSONRepresentation(resourceMap);
+  }
+
+  @PUT
+  @Path("{resourceName}")
+  public Response addResource(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName,
+      @DefaultValue("-1") @QueryParam("numPartitions") int numPartitions,
+      @DefaultValue("") @QueryParam("stateModelRef") String stateModelRef,
+      @DefaultValue("SEMI_AUTO") @QueryParam("rebalancerMode") String rebalancerMode,
+      @DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy,
+      @DefaultValue("0") @QueryParam("bucketSize") int bucketSize,
+      @DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance,
+      String content) {
+
+    HelixAdmin admin = getHelixAdmin();
+
+    try {
+      if (content.length() != 0) {
+        ZNRecord record;
+        try {
+          record = toZNRecord(content);
+        } catch (IOException e) {
+          _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+          return badRequest("Input is not a vaild ZNRecord!");
+        }
+
+        if (record.getSimpleFields() != null) {
+          admin.addResource(clusterId, resourceName, new IdealState(record));
+        }
+      } else {
+        admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode,
+            rebalanceStrategy, bucketSize, maxPartitionsPerInstance);
+      }
+    } catch (Exception e) {
+      _logger.error("Error in adding a resource: " + resourceName, e);
+      return serverError(e);
+    }
+
+    return OK();
+  }
+
+  @POST
+  @Path("{resourceName}")
+  public Response updateResource(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName, @QueryParam("command") String command,
+      @DefaultValue("-1") @QueryParam("replicas") int replicas,
+      @DefaultValue("") @QueryParam("keyPrefix") String keyPrefix,
+      @DefaultValue("") @QueryParam("group") String group){
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
+
+    HelixAdmin admin = getHelixAdmin();
+    try {
+      switch (cmd) {
+      case enable:
+        admin.enableResource(clusterId, resourceName, true);
+        break;
+      case disable:
+        admin.enableResource(clusterId, resourceName, false);
+        break;
+      case rebalance:
+        if (replicas == -1) {
+          return badRequest("Number of replicas is needed for rebalancing!");
+        }
+        keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
+        admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
+        break;
+      default:
+        _logger.error("Unsupported command :" + command);
+        return badRequest("Unsupported command :" + command);
+      }
+    } catch (Exception e) {
+      _logger.error("Failed in updating resource : " + resourceName, e);
+      return badRequest(e.getMessage());
+    }
+    return OK();
+  }
+
+  @DELETE
+  @Path("{resourceName}")
+  public Response deleteResource(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName) {
+    HelixAdmin admin = getHelixAdmin();
+    try {
+      admin.dropResource(clusterId, resourceName);
+    } catch (Exception e) {
+      _logger.error("Error in deleting a resource: " + resourceName, e);
+      return serverError();
+    }
+    return OK();
+  }
+
+  @GET
+  @Path("{resourceName}/configs")
+  public Response getResourceConfig(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName) {
+    ConfigAccessor accessor = getConfigAccessor();
+    ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
+    if (resourceConfig != null) {
+      return JSONRepresentation(resourceConfig.getRecord());
+    }
+
+    return notFound();
+  }
+
+  @POST
+  @Path("{resourceName}/configs")
+  public Response updateResourceConfig(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName, String content) {
+    ZNRecord record;
+    try {
+      record = toZNRecord(content);
+    } catch (IOException e) {
+      _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+      return badRequest("Input is not a vaild ZNRecord!");
+    }
+    ResourceConfig resourceConfig = new ResourceConfig(record);
+    ConfigAccessor configAccessor = getConfigAccessor();
+    try {
+      configAccessor.updateResourceConfig(clusterId, resourceName, resourceConfig);
+    } catch (HelixException ex) {
+      return notFound(ex.getMessage());
+    } catch (Exception ex) {
+      _logger.error(
+          "Failed to update cluster config, cluster " + clusterId + " new config: " + content
+              + ", Exception: " + ex);
+      return serverError(ex);
+    }
+    return OK();
+  }
+
+  @GET
+  @Path("{resourceName}/idealState")
+  public Response getResourceIdealState(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName) {
+    HelixAdmin admin = getHelixAdmin();
+    IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
+    if (idealState != null) {
+      return JSONRepresentation(idealState.getRecord());
+    }
+
+    return notFound();
+  }
+
+  @GET
+  @Path("{resourceName}/externalView")
+  public Response getResourceExternalView(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName) {
+    HelixAdmin admin = getHelixAdmin();
+    ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
+    if (externalView != null) {
+      return JSONRepresentation(externalView.getRecord());
+    }
+
+    return notFound();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
new file mode 100644
index 0000000..ebd04fd
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
@@ -0,0 +1,325 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.type.TypeFactory;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+import org.codehaus.jackson.node.TextNode;
+
+@Path("/clusters/{clusterId}/workflows")
+public class WorkflowAccessor extends AbstractHelixResource {
+  private static Logger _logger = LoggerFactory.getLogger(WorkflowAccessor.class.getName());
+
+  public enum WorkflowProperties {
+    Workflows,
+    WorkflowConfig,
+    WorkflowContext,
+    Jobs,
+    ParentJobs
+  }
+
+  public enum TaskCommand {
+    stop,
+    resume,
+    clean
+  }
+
+  @GET
+  public Response getWorkflows(@PathParam("clusterId") String clusterId) {
+    TaskDriver taskDriver = getTaskDriver(clusterId);
+    Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
+    Map<String, List<String>> dataMap = new HashMap<>();
+    dataMap.put(WorkflowProperties.Workflows.name(), new ArrayList<>(workflowConfigMap.keySet()));
+
+    return JSONRepresentation(dataMap);
+  }
+
+  @GET
+  @Path("{workflowId}")
+  public Response getWorkflow(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId) {
+    TaskDriver taskDriver = getTaskDriver(clusterId);
+    WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId);
+    WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId);
+
+    ObjectNode root = JsonNodeFactory.instance.objectNode();
+    TextNode id = JsonNodeFactory.instance.textNode(workflowId);
+    root.put(Properties.id.name(), id);
+
+    ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode();
+    ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode();
+
+    if (workflowConfig != null) {
+      getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord());
+    }
+
+    if (workflowContext != null) {
+      getWorkflowContextNode(workflowContextNode, workflowContext.getRecord());
+    }
+
+    root.put(WorkflowProperties.WorkflowConfig.name(), workflowConfigNode);
+    root.put(WorkflowProperties.WorkflowContext.name(), workflowContextNode);
+
+    JobDag jobDag = workflowConfig.getJobDag();
+    ArrayNode jobs = OBJECT_MAPPER.valueToTree(jobDag.getAllNodes());
+    ObjectNode parentJobs = OBJECT_MAPPER.valueToTree(jobDag.getParentsToChildren());
+    root.put(WorkflowProperties.Jobs.name(), jobs);
+    root.put(WorkflowProperties.ParentJobs.name(), parentJobs);
+
+    return JSONRepresentation(root);
+  }
+
+  @PUT
+  @Path("{workflowId}")
+  public Response createWorkflow(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId, String content) {
+    TaskDriver driver = getTaskDriver(clusterId);
+    Map<String, String> cfgMap;
+    try {
+      JsonNode root = OBJECT_MAPPER.readTree(content);
+      cfgMap = OBJECT_MAPPER
+          .readValue(root.get(WorkflowProperties.WorkflowConfig.name()).toString(),
+              TypeFactory.defaultInstance()
+                  .constructMapType(HashMap.class, String.class, String.class));
+
+      WorkflowConfig workflowConfig = WorkflowConfig.Builder.fromMap(cfgMap).build();
+
+      // Since JobQueue can keep adding jobs, Helix create JobQueue will ignore the jobs
+      if (workflowConfig.isJobQueue()) {
+        driver.start(new JobQueue.Builder(workflowId).setWorkflowConfig(workflowConfig).build());
+        return OK();
+      }
+
+      Workflow.Builder workflow = new Workflow.Builder(workflowId);
+
+      if (root.get(WorkflowProperties.Jobs.name()) != null) {
+        Map<String, JobConfig.Builder> jobConfigs =
+            getJobConfigs((ArrayNode) root.get(WorkflowProperties.Jobs.name()));
+        for (Map.Entry<String, JobConfig.Builder> job : jobConfigs.entrySet()) {
+          workflow.addJob(job.getKey(), job.getValue());
+        }
+      }
+
+      if (root.get(WorkflowProperties.ParentJobs.name()) != null) {
+        Map<String, List<String>> parentJobs = OBJECT_MAPPER
+            .readValue(root.get(WorkflowProperties.ParentJobs.name()).toString(),
+                TypeFactory.defaultInstance()
+                    .constructMapType(HashMap.class, String.class, List.class));
+        for (Map.Entry<String, List<String>> entry : parentJobs.entrySet()) {
+          String parentJob = entry.getKey();
+          for (String childJob : entry.getValue()) {
+            workflow.addParentChildDependency(parentJob, childJob);
+          }
+        }
+      }
+
+      driver.start(workflow.build());
+    } catch (IOException e) {
+      return badRequest(String
+          .format("Invalid input of Workflow %s for reason : %s", workflowId, e.getMessage()));
+    } catch (HelixException e) {
+      return badRequest(String
+          .format("Failed to create workflow %s for reason : %s", workflowId, e.getMessage()));
+    }
+    return OK();
+  }
+
+  @DELETE
+  @Path("{workflowId}")
+  public Response deleteWorkflow(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId) {
+    TaskDriver driver = getTaskDriver(clusterId);
+    try {
+      driver.delete(workflowId);
+    } catch (HelixException e) {
+      return badRequest(String
+          .format("Failed to delete workflow %s for reason : %s", workflowId, e.getMessage()));
+    }
+    return OK();
+  }
+
+  @POST
+  @Path("{workflowId}")
+  public Response updateWorkflow(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId, @QueryParam("command") String command) {
+    TaskDriver driver = getTaskDriver(clusterId);
+
+    try {
+      TaskCommand cmd = TaskCommand.valueOf(command);
+      switch (cmd) {
+      case stop:
+        driver.stop(workflowId);
+        break;
+      case resume:
+        driver.resume(workflowId);
+        break;
+      case clean:
+        driver.cleanupQueue(workflowId);
+        break;
+      default:
+        return badRequest(String.format("Invalid command : %s", command));
+      }
+    } catch (HelixException e) {
+      return badRequest(
+          String.format("Failed to execute operation %s for reason : %s", command, e.getMessage()));
+    } catch (Exception e) {
+      return serverError(e);
+    }
+
+    return OK();
+  }
+
+  @GET
+  @Path("{workflowId}/configs")
+  public Response getWorkflowConfig(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId) {
+    TaskDriver taskDriver = getTaskDriver(clusterId);
+    WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId);
+    ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode();
+    if (workflowConfig != null) {
+      getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord());
+    }
+
+    return JSONRepresentation(workflowConfigNode);
+  }
+
+  @POST
+  @Path("{workflowId}/configs")
+  public Response updateWorkflowConfig(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId, String content) {
+    ZNRecord record;
+    TaskDriver driver = getTaskDriver(clusterId);
+
+    try {
+      record = toZNRecord(content);
+
+      WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowId);
+      if (workflowConfig == null) {
+        return badRequest(
+            String.format("WorkflowConfig for workflow %s does not exists!", workflowId));
+      }
+
+      workflowConfig.getRecord().update(record);
+      driver.updateWorkflow(workflowId, workflowConfig);
+    } catch (HelixException e) {
+      return badRequest(
+          String.format("Failed to update WorkflowConfig for workflow %s", workflowId));
+    } catch (Exception e) {
+      return badRequest(String.format("Invalid WorkflowConfig for workflow %s", workflowId));
+    }
+
+    return OK();
+  }
+
+  @GET
+  @Path("{workflowId}/context")
+  public Response getWorkflowContext(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId) {
+    TaskDriver taskDriver = getTaskDriver(clusterId);
+    WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId);
+    ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode();
+    if (workflowContext != null) {
+      getWorkflowContextNode(workflowContextNode, workflowContext.getRecord());
+    }
+
+    return JSONRepresentation(workflowContextNode);
+  }
+
+  private void getWorkflowConfigNode(ObjectNode workflowConfigNode, ZNRecord record) {
+    for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) {
+      if (!entry.getKey().equals(WorkflowConfig.WorkflowConfigProperty.Dag)) {
+        workflowConfigNode.put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue()));
+      }
+    }
+  }
+
+  private void getWorkflowContextNode(ObjectNode workflowContextNode, ZNRecord record) {
+    if (record.getMapFields() != null) {
+      for (String fieldName : record.getMapFields().keySet()) {
+        JsonNode node = OBJECT_MAPPER.valueToTree(record.getMapField(fieldName));
+        workflowContextNode.put(fieldName, node);
+      }
+    }
+
+    if (record.getSimpleFields() != null) {
+      for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) {
+        workflowContextNode
+            .put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue()));
+      }
+    }
+  }
+
+  private Map<String, JobConfig.Builder> getJobConfigs(ArrayNode root)
+      throws HelixException, IOException {
+    Map<String, JobConfig.Builder> jobConfigsMap = new HashMap<>();
+    for (Iterator<JsonNode> it = root.getElements(); it.hasNext(); ) {
+      JsonNode job = it.next();
+      ZNRecord record = null;
+
+      try {
+        record = toZNRecord(job.toString());
+      } catch (IOException e) {
+        // Ignore the parse since it could be just simple fields
+      }
+
+      if (record == null || record.getSimpleFields().isEmpty()) {
+        Map<String, String> cfgMap = OBJECT_MAPPER.readValue(job.toString(),
+            TypeFactory.defaultInstance()
+                .constructMapType(HashMap.class, String.class, String.class));
+        jobConfigsMap
+            .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(cfgMap));
+      } else {
+        jobConfigsMap
+            .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(record));
+      }
+    }
+
+    return jobConfigsMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java
new file mode 100644
index 0000000..dd5c5f9
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java
@@ -0,0 +1,47 @@
+package org.apache.helix.rest.server.resources.metadata;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+import org.apache.helix.rest.common.ContextPropertyKeys;
+import org.apache.helix.rest.common.HelixRestNamespace;
+import org.apache.helix.rest.server.resources.AbstractResource;
+
+
+@Path("/namespaces")
+public class NamespacesAccessor extends AbstractResource {
+  @GET
+  public Response getHelixRestNamespaces() {
+    @SuppressWarnings("unchecked")
+    List<HelixRestNamespace> allNamespaces =
+        (List<HelixRestNamespace>) _application.getProperties()
+            .get(ContextPropertyKeys.ALL_NAMESPACES.name());
+    List<Map<String, String>> ret = new ArrayList<>();
+    for (HelixRestNamespace namespace : allNamespaces) {
+      ret.add(namespace.getRestInfo());
+    }
+    return JSONRepresentation(ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 94c5f63..763d3e2 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -42,7 +42,7 @@ import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.rest.common.HelixRestNamespace;
 import org.apache.helix.rest.server.auditlog.AuditLog;
 import org.apache.helix.rest.server.resources.AbstractResource.Command;
-import org.apache.helix.rest.server.resources.ClusterAccessor;
+import org.apache.helix.rest.server.resources.helix.ClusterAccessor;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.testng.Assert;

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java
index e213bd3..00ffdba 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java
@@ -57,8 +57,7 @@ public class TestHelixRestServer extends AbstractTestClass {
     try {
       List<HelixRestNamespace> invalidManifest3 = new ArrayList<>();
       invalidManifest3.add(
-          new HelixRestNamespace("DuplicatedName", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR,
-              true));
+          new HelixRestNamespace("DuplicatedName", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR, true));
       invalidManifest3.add(
           new HelixRestNamespace("DuplicatedName", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR,
               false));
@@ -77,7 +76,7 @@ public class TestHelixRestServer extends AbstractTestClass {
           new HelixRestNamespace("test4-2", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR, true));
       HelixRestServer svr = new HelixRestServer(invalidManifest4, 10250, "/", Collections.<AuditLogger>emptyList());
       Assert.assertFalse(true, "InvalidManifest4 test failed");
-    } catch (IllegalArgumentException e) {
+    } catch (IllegalStateException e) {
       // OK
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
index 947ba49..763f95b 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import javax.ws.rs.client.Entity;
@@ -33,10 +32,9 @@ import javax.ws.rs.core.Response;
 import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.rest.server.resources.AbstractResource;
-import org.apache.helix.rest.server.resources.InstanceAccessor;
+import org.apache.helix.rest.server.resources.helix.InstanceAccessor;
 import org.codehaus.jackson.JsonNode;
 import org.testng.Assert;
 import org.testng.annotations.Test;

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
index eca6836..682039d 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
@@ -27,8 +27,8 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.rest.server.resources.JobAccessor;
-import org.apache.helix.rest.server.resources.WorkflowAccessor;
+import org.apache.helix.rest.server.resources.helix.JobAccessor;
+import org.apache.helix.rest.server.resources.helix.WorkflowAccessor;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskDriver;

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java
index e6f036d..c77f939 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java
@@ -21,15 +21,22 @@ package org.apache.helix.rest.server;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.rest.common.HelixRestNamespace;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestNamespacedAPIAccess extends AbstractTestClass {
+  ObjectMapper _mapper = new ObjectMapper();
+
   @Test
   public void testDefaultNamespaceCompatibility() {
     String testClusterName1 = "testClusterForDefaultNamespaceCompatibility1";
@@ -81,4 +88,52 @@ public class TestNamespacedAPIAccess extends AbstractTestClass {
     get(String.format("/clusters/%s", testClusterName), Response.Status.OK.getStatusCode(), false);
   }
 
+  @Test
+  public void testNamespaceServer() throws IOException {
+    // Default endpoints should not have any namespace information returned
+    get("/", Response.Status.NOT_FOUND.getStatusCode(), false);
+
+    // Get invalid namespace should return not found
+    get("/namespaces/invalid-namespace", Response.Status.NOT_FOUND.getStatusCode(), false);
+
+    // list namespace should return a list of all namespaces
+    String body = get("/namespaces", Response.Status.OK.getStatusCode(), true);
+    List<Map<String, String>> namespaceMaps = _mapper
+        .readValue(body, _mapper.getTypeFactory().constructCollectionType(List.class, Map.class));
+    Assert.assertEquals(namespaceMaps.size(), 2);
+
+    Set<String> expectedNamespaceNames = new HashSet<>();
+    expectedNamespaceNames.add(HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+    expectedNamespaceNames.add(TEST_NAMESPACE);
+
+    for (Map<String, String> namespaceMap : namespaceMaps) {
+      String name = namespaceMap.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name());
+      boolean isDefault = Boolean.parseBoolean(
+          namespaceMap.get(HelixRestNamespace.HelixRestNamespaceProperty.IS_DEFAULT.name()));
+      switch (name) {
+      case HelixRestNamespace.DEFAULT_NAMESPACE_NAME:
+        Assert.assertTrue(isDefault);
+        break;
+      case TEST_NAMESPACE:
+        Assert.assertFalse(isDefault);
+        break;
+      default:
+        Assert.assertFalse(true, "Namespace " + name + " is not expected");
+        break;
+      }
+      expectedNamespaceNames.remove(name);
+    }
+    Assert.assertTrue(expectedNamespaceNames.isEmpty());
+
+    // Accessing root of namespaced API endpoint shall return information of that namespace
+    body = get(String.format("/namespaces/%s", HelixRestNamespace.DEFAULT_NAMESPACE_NAME),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, String> namespace = _mapper.readValue(body,
+        _mapper.getTypeFactory().constructMapType(Map.class, String.class, String.class));
+    Assert.assertEquals(namespace.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()),
+        HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+    Assert.assertTrue(Boolean.parseBoolean(
+        namespace.get(HelixRestNamespace.HelixRestNamespaceProperty.IS_DEFAULT.name())));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
index 8e116e9..dea4e06 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
@@ -30,7 +30,7 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
-import org.apache.helix.rest.server.resources.ResourceAccessor;
+import org.apache.helix.rest.server.resources.helix.ResourceAccessor;
 import org.codehaus.jackson.JsonNode;
 import org.testng.Assert;
 import org.testng.annotations.Test;

http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
index c41101f..ad8894a 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
@@ -8,7 +8,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.rest.server.resources.WorkflowAccessor;
+import org.apache.helix.rest.server.resources.helix.WorkflowAccessor;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;


Mime
View raw message