falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [6/6] git commit: FALCON-289 Provide REST APIs for discovering lineage metadata over the store. Contributed by Venkatesh Seetharam
Date Thu, 06 Mar 2014 00:49:42 GMT
FALCON-289 Provide REST APIs for discovering lineage metadata over the store. Contributed by
Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b8b2fe45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b8b2fe45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b8b2fe45

Branch: refs/heads/master
Commit: b8b2fe45dc480223af1bc70ed13e70ab82b60053
Parents: ecb919c
Author: Venkatesh Seetharam <venkatesh@hortonworks.com>
Authored: Wed Mar 5 16:47:37 2014 -0800
Committer: Venkatesh Seetharam <venkatesh@hortonworks.com>
Committed: Wed Mar 5 16:47:37 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   5 +
 .../java/org/apache/falcon/cli/FalconCLI.java   | 123 ++++++
 .../org/apache/falcon/client/FalconClient.java  |  74 ++++
 .../metadata/LineageMetadataResource.java       | 433 +++++++++++++++++++
 prism/src/main/webapp/WEB-INF/web.xml           |   2 +-
 process/pom.xml                                 |  10 -
 webapp/src/main/webapp/WEB-INF/embedded/web.xml |   2 +-
 webapp/src/main/webapp/WEB-INF/web.xml          |   2 +-
 .../org/apache/falcon/cli/FalconCLISmokeIT.java |  10 +
 9 files changed, 648 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 931a868..fdb9490 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,11 @@ Trunk (Unreleased)
     FALCON-254 Bootstrap designer module. (Srikanth Sundarrajan via Shwetha GS)
 
     FALCON-238 Support updates at specific time. (Shwetha GS)
+
+    FALCON-285 Support Lineage information capture (Venkatesh Seetharam)
+
+    FALCON-289 Provide REST APIs for discovering lineage metadata over the store.
+    (Venkatesh Seetharam)
    
   IMPROVEMENTS
     FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index a414e32..1663073 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -89,6 +89,20 @@ public class FalconCLI {
     public static final String CURRENT_COLO = "current.colo";
     public static final String CLIENT_PROPERTIES = "/client.properties";
 
+    // Graph Commands
+    public static final String GRAPH_CMD = "graph";
+    public static final String VERTEX_CMD = "vertex";
+    public static final String VERTICES_CMD = "vertices";
+    public static final String VERTEX_EDGES_CMD = "edges";
+
+    // Graph Command Options
+    public static final String EDGE_CMD = "edge";
+    public static final String ID_OPT = "id";
+    public static final String NAME_OPT = "name";
+    public static final String VALUE_OPT = "value";
+    public static final String DIRECTION_OPT = "direction";
+    public static final String DUMP_OPT = "all";
+
     /**
      * Entry point for the Falcon CLI when invoked from the command line. Upon
      * completion this method exits the JVM with '0' (success) or '-1'
@@ -132,6 +146,7 @@ public class FalconCLI {
                 "",
                 "Process instances operations like running, status, kill, suspend, resume,
rerun, logs",
                 instanceOptions(), false);
+        parser.addCommand(GRAPH_CMD, "", "graph operations", createGraphOptions(), true);
 
         try {
             CLIParser.Command command = parser.parse(args);
@@ -149,6 +164,8 @@ public class FalconCLI {
                     entityCommand(commandLine, client);
                 } else if (command.getName().equals(INSTANCE_CMD)) {
                     instanceCommand(commandLine, client);
+                } else if (command.getName().equals(GRAPH_CMD)) {
+                    graphCommand(commandLine, client);
                 }
             }
 
@@ -572,6 +589,112 @@ public class FalconCLI {
         return instanceOptions;
     }
 
+    private Options createGraphOptions() {
+        Options graphOptions = new Options();
+        Option url = new Option(URL_OPTION, true, "Falcon URL");
+        graphOptions.addOption(url);
+
+        Option vertex = new Option(VERTEX_CMD, false, "show the vertices");
+        Option vertices = new Option(VERTICES_CMD, false, "show the vertices");
+        Option vertexEdges = new Option(VERTEX_EDGES_CMD, false, "show the edges for a given
vertex");
+        Option edges = new Option(EDGE_CMD, false, "show the edges");
+
+        OptionGroup group = new OptionGroup();
+        group.addOption(vertex);
+        group.addOption(vertices);
+        group.addOption(vertexEdges);
+        group.addOption(edges);
+        graphOptions.addOptionGroup(group);
+
+        Option id = new Option(ID_OPT, true, "vertex or edge id");
+        graphOptions.addOption(id);
+
+        Option name = new Option(NAME_OPT, true, "name property");
+        graphOptions.addOption(name);
+
+        Option value = new Option(VALUE_OPT, true, "value property");
+        graphOptions.addOption(value);
+
+        Option direction = new Option(DIRECTION_OPT, true, "edge direction property");
+        graphOptions.addOption(direction);
+
+        Option dump = new Option(DUMP_OPT, false, "dump all elements");
+        graphOptions.addOption(dump);
+
+        return graphOptions;
+    }
+
+    private void graphCommand(CommandLine commandLine,
+                              FalconClient client) throws FalconCLIException {
+        Set<String> optionsList = new HashSet<String>();
+        for (Option option : commandLine.getOptions()) {
+            optionsList.add(option.getOpt());
+        }
+
+        String result;
+        String id = commandLine.getOptionValue(ID_OPT);
+        String name = commandLine.getOptionValue(NAME_OPT);
+        String value = commandLine.getOptionValue(VALUE_OPT);
+        String direction = commandLine.getOptionValue(DIRECTION_OPT);
+        String dump = commandLine.getOptionValue(DUMP_OPT);
+
+        if (optionsList.contains(VERTEX_CMD)) {
+            validateId(id);
+            result = client.getVertex(id);
+        } else if (optionsList.contains(VERTICES_CMD)) {
+            if (isDump(dump)) {
+                result = client.getVertices();
+            } else {
+                validateVerticesCommand(name, value);
+                result = client.getVertices(name, value);
+            }
+        } else if (optionsList.contains(VERTEX_EDGES_CMD)) {
+            if (isDump(dump)) {
+                result = client.getEdges();
+            } else {
+                validateVertexEdgesCommand(id, direction);
+                result = client.getVertexEdges(id, direction);
+            }
+        } else if (optionsList.contains(EDGE_CMD)) {
+            validateId(id);
+            result = client.getEdge(id);
+        } else {
+            throw new FalconCLIException("Invalid command");
+        }
+
+        OUT.get().println(result);
+    }
+
+    private void validateId(String id) throws FalconCLIException {
+        if (id == null || id.length() == 0) {
+            throw new FalconCLIException("Missing argument: id");
+        }
+    }
+
+    private boolean isDump(String dump) {
+        return dump != null;
+    }
+
+    private void validateVerticesCommand(String name, String value) throws FalconCLIException
{
+        if (name == null || name.length() == 0) {
+            throw new FalconCLIException("Missing argument: name");
+        }
+
+        if (value == null || value.length() == 0) {
+            throw new FalconCLIException("Missing argument: value");
+        }
+    }
+
+    private void validateVertexEdgesCommand(String id, String direction) throws FalconCLIException
{
+        if (id == null || id.length() == 0) {
+            throw new FalconCLIException("Missing argument: id");
+        }
+
+        if (direction == null || direction.length() == 0) {
+            throw new FalconCLIException("Missing argument: direction");
+        }
+    }
+
     protected String getFalconEndpoint(CommandLine commandLine) throws FalconCLIException,
IOException {
         String url = commandLine.getOptionValue(URL_OPTION);
         if (url == null) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 59b2a3b..f008953 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -736,6 +736,80 @@ public class FalconClient {
         return sb.toString();
     }
 
+    protected static enum GraphOperations {
+
+        VERTICES("api/graphs/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        EDGES("api/graphs/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON);
+
+        private String path;
+        private String method;
+        private String mimeType;
+
+        GraphOperations(String path, String method, String mimeType) {
+            this.path = path;
+            this.method = method;
+            this.mimeType = mimeType;
+        }
+    }
+
+    public String getVertex(String id) throws FalconCLIException {
+        return sendGraphRequest(GraphOperations.VERTICES, id);
+    }
+
+    public String getVertices(String key, String value) throws FalconCLIException {
+        return sendGraphRequest(GraphOperations.VERTICES, key, value);
+    }
+
+    public String getVertexEdges(String id, String direction) throws FalconCLIException {
+        return sendGraphRequestForEdges(GraphOperations.VERTICES, id, direction);
+    }
+
+    public String getEdge(String id) throws FalconCLIException {
+        return sendGraphRequest(GraphOperations.EDGES, id);
+    }
+
+    public String getVertices() throws FalconCLIException {
+        return sendGraphRequest(GraphOperations.VERTICES, "all");
+    }
+
+    public String getEdges() throws FalconCLIException {
+        return sendGraphRequest(GraphOperations.EDGES, "all");
+    }
+
+    private String sendGraphRequest(GraphOperations job, String id) throws FalconCLIException
{
+        ClientResponse clientResponse = service.path(job.path)
+                .path(id)
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(job.mimeType)
+                .type(job.mimeType)
+                .method(job.method, ClientResponse.class);
+        return parseStringResult(clientResponse);
+    }
+
+    private String sendGraphRequest(GraphOperations job, String name,
+                                    String value) throws FalconCLIException {
+        ClientResponse clientResponse = service.path(job.path)
+                .queryParam("name", name)
+                .queryParam("value", value)
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(job.mimeType)
+                .type(job.mimeType)
+                .method(job.method, ClientResponse.class);
+        return parseStringResult(clientResponse);
+    }
+
+    private String sendGraphRequestForEdges(GraphOperations job, String id,
+                                            String direction) throws FalconCLIException {
+        ClientResponse clientResponse = service.path(job.path)
+                .path(id)
+                .path(direction)
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(job.mimeType)
+                .type(job.mimeType)
+                .method(job.method, ClientResponse.class);
+        return parseStringResult(clientResponse);
+    }
+
     private void checkIfSuccessful(ClientResponse clientResponse)
         throws FalconCLIException {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
new file mode 100644
index 0000000..f140bca
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.KeyIndexableGraph;
+import com.tinkerpop.blueprints.Vertex;
+import com.tinkerpop.blueprints.VertexQuery;
+import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
+import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
+import org.apache.falcon.metadata.GraphUtils;
+import org.apache.falcon.metadata.MetadataMappingService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.log4j.Logger;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Set;
+
+/**
+ * Jersey Resource for lineage metadata operations.
+ */
+@Path("graphs/lineage")
+public class LineageMetadataResource {
+
+    private static final Logger LOG = Logger.getLogger(LineageMetadataResource.class);
+
+    public static final String RESULTS = "results";
+    public static final String TOTAL_SIZE = "totalSize";
+
+    private final MetadataMappingService service;
+
+    public LineageMetadataResource() {
+        if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) {
+            service = Services.get().getService(MetadataMappingService.SERVICE_NAME);
+        } else {
+            service = null;
+        }
+    }
+
+    private KeyIndexableGraph getGraph() {
+        return service.getGraph();
+    }
+
+    private Set<String> getVertexIndexedKeys() {
+        return service.getVertexIndexedKeys();
+    }
+
+    private Set<String> getEdgeIndexedKeys() {
+        return service.getEdgeIndexedKeys();
+    }
+
+    /**
+     * Dump the graph.
+     *
+     * GET http://host/graphs/lineage/serialize
+     * graph.getVertices();
+     */
+    @GET
+    @Path("/serialize")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response serializeGraph() {
+        checkIfMetadataMappingServiceIsEnabled();
+        String file = StartupProperties.get().getProperty("falcon.graph.serialize.path")
+                + "lineage-graph-" + System.currentTimeMillis() + ".json";
+        LOG.info("Serialize Graph to: " + file);
+        try {
+            GraphUtils.dump(getGraph(), file);
+            return Response.ok().build();
+        } catch (Exception e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    /**
+     * Get all vertices.
+     *
+     * GET http://host/graphs/lineage/vertices/all
+     * graph.getVertices();
+     */
+    @GET
+    @Path("/vertices/all")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getVertices() {
+        checkIfMetadataMappingServiceIsEnabled();
+        LOG.info("Get All Vertices");
+        try {
+            JSONArray vertexArray = new JSONArray();
+            long counter = 0;
+            for (Vertex vertex : getGraph().getVertices()) {
+                counter++;
+                vertexArray.put(GraphSONUtility.jsonFromElement(
+                        vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL));
+            }
+
+            JSONObject response = new JSONObject();
+            response.put(RESULTS, vertexArray);
+            response.put(TOTAL_SIZE, counter);
+            return Response.ok(response).build();
+        } catch (JSONException e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    /**
+     * Get a single vertex with a unique id.
+     *
+     * GET http://host/graphs/lineage/vertices/id
+     * graph.getVertex(id);
+     */
+    @GET
+    @Path("/vertices/{id}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getVertex(@PathParam("id") final String vertexId) {
+        checkIfMetadataMappingServiceIsEnabled();
+        LOG.info("Get vertex for vertexId= " + vertexId);
+        try {
+            Vertex vertex = getGraph().getVertex(vertexId);
+            if (vertex == null) {
+                String message = "Vertex with [" + vertexId + "] cannot be found.";
+                LOG.info(message);
+                throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+                        .entity(JSONObject.quote(message)).build());
+            }
+
+            JSONObject response = new JSONObject();
+            response.put(RESULTS, GraphSONUtility.jsonFromElement(
+                    vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL));
+            return Response.ok(response).build();
+        } catch (JSONException e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    /**
+     * Get a list of vertices matching a property name and a value.
+     * <p/>
+     * GET http://host/graphs/lineage/vertices?name=<name>&value=<value>
+     * graph.getVertices(name, value);
+     */
+    @GET
+    @Path("/vertices")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getVertices(@QueryParam("name") final String name,
+                                @QueryParam("value") final String value) {
+        checkIfMetadataMappingServiceIsEnabled();
+        LOG.info("Get vertices for property name= " + name + ", value= " + value);
+        try {
+            Iterable<Vertex> vertices = getGraph().getVertices(name, value);
+            final JSONArray vertexArray = new JSONArray();
+
+            long counter = 0;
+            for (Vertex vertex : vertices) {
+                counter++;
+                vertexArray.put(GraphSONUtility.jsonFromElement(
+                        vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL));
+            }
+
+            JSONObject response = new JSONObject();
+            response.put(RESULTS, vertexArray);
+            response.put(TOTAL_SIZE, counter);
+            return Response.ok(response).build();
+        } catch (JSONException e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    /**
+     * Get a list of adjacent edges with a direction.
+     *
+     * GET http://host/graphs/lineage/vertices/id/direction
+     * graph.getVertex(id).get{Direction}Edges();
+     * direction: {(?!outE)(?!bothE)(?!inE)(?!out)(?!both)(?!in)(?!query).+}
+     */
+    @GET
+    @Path("vertices/{id}/{direction}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getVertexEdges(@PathParam("id") String vertexId,
+                                   @PathParam("direction") String direction) {
+        checkIfMetadataMappingServiceIsEnabled();
+        LOG.info("Get vertex edges for vertexId= " + vertexId + ", direction= " + direction);
+        try {
+            Vertex vertex = getGraph().getVertex(vertexId);
+            if (vertex == null) {
+                String message = "Vertex with [" + vertexId + "] cannot be found.";
+                LOG.info(message);
+                throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+                        .entity(JSONObject.quote(message)).build());
+            }
+
+            // break out the segment into the return and the direction
+            VertexQueryArguments queryArguments = new VertexQueryArguments(direction);
+            // if this is a query and the _return is "count" then we don't bother to send
back the result array
+            boolean countOnly = queryArguments.isCountOnly();
+            // what kind of data the calling client wants back (vertices, edges, count, vertex
identifiers)
+            ReturnType returnType = queryArguments.getReturnType();
+            // the query direction (both, out, in)
+            Direction queryDirection = queryArguments.getQueryDirection();
+
+            VertexQuery query = vertex.query().direction(queryDirection);
+
+            JSONArray elementArray = new JSONArray();
+            long counter = 0;
+            if (returnType == ReturnType.VERTICES || returnType == ReturnType.VERTEX_IDS)
{
+                Iterable<Vertex> vertexQueryResults = query.vertices();
+                for (Vertex v : vertexQueryResults) {
+                    if (returnType.equals(ReturnType.VERTICES)) {
+                        elementArray.put(GraphSONUtility.jsonFromElement(
+                                v, getVertexIndexedKeys(), GraphSONMode.NORMAL));
+                    } else {
+                        elementArray.put(v.getId());
+                    }
+                    counter++;
+                }
+            } else if (returnType == ReturnType.EDGES) {
+                Iterable<Edge> edgeQueryResults = query.edges();
+                for (Edge e : edgeQueryResults) {
+                    elementArray.put(GraphSONUtility.jsonFromElement(
+                            e, getEdgeIndexedKeys(), GraphSONMode.NORMAL));
+                    counter++;
+                }
+            } else if (returnType == ReturnType.COUNT) {
+                counter = query.count();
+            }
+
+            JSONObject response = new JSONObject();
+            if (!countOnly) {
+                response.put(RESULTS, elementArray);
+            }
+            response.put(TOTAL_SIZE, counter);
+            return Response.ok(response).build();
+        } catch (JSONException e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    /**
+     * Get all edges.
+     *
+     * GET http://host/graphs/lineage/edges/all
+     * graph.getEdges();
+     */
+    @GET
+    @Path("/edges/all")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getEdges() {
+        checkIfMetadataMappingServiceIsEnabled();
+        LOG.info("Get All Edges.");
+        try {
+            JSONArray vertexArray = new JSONArray();
+            long counter = 0;
+            for (Edge edge : getGraph().getEdges()) {
+                counter++;
+                vertexArray.put(GraphSONUtility.jsonFromElement(
+                        edge, getEdgeIndexedKeys(), GraphSONMode.NORMAL));
+            }
+
+            JSONObject response = new JSONObject();
+            response.put(RESULTS, vertexArray);
+            response.put(TOTAL_SIZE, counter);
+            return Response.ok(response).build();
+        } catch (JSONException e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    /**
+     * Get a single edge with a unique id.
+     *
+     * GET http://host/graphs/lineage/edges/id
+     * graph.getEdge(id);
+     */
+    @GET
+    @Path("/edges/{id}")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getEdge(@PathParam("id") final String edgeId) {
+        checkIfMetadataMappingServiceIsEnabled();
+        LOG.info("Get vertex for edgeId= " + edgeId);
+        try {
+            Edge edge = getGraph().getEdge(edgeId);
+            if (edge == null) {
+                String message = "Edge with [" + edgeId + "] cannot be found.";
+                LOG.info(message);
+                throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+                        .entity(JSONObject.quote(message)).build());
+            }
+
+            JSONObject response = new JSONObject();
+            response.put(RESULTS, GraphSONUtility.jsonFromElement(
+                    edge, getEdgeIndexedKeys(), GraphSONMode.NORMAL));
+            return Response.ok(response).build();
+        } catch (JSONException e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    private void checkIfMetadataMappingServiceIsEnabled() {
+        if (service == null) {
+            throw new WebApplicationException(
+                    Response.status(Response.Status.NOT_FOUND)
+                            .entity("Lineage Metadata Service is not enabled.")
+                            .type("text/plain")
+                            .build());
+        }
+    }
+
+    private enum ReturnType {VERTICES, EDGES, COUNT, VERTEX_IDS}
+
+    public static final String OUT_E = "outE";
+    public static final String IN_E = "inE";
+    public static final String BOTH_E = "bothE";
+    public static final String OUT = "out";
+    public static final String IN = "in";
+    public static final String BOTH = "both";
+    public static final String OUT_COUNT = "outCount";
+    public static final String IN_COUNT = "inCount";
+    public static final String BOTH_COUNT = "bothCount";
+    public static final String OUT_IDS = "outIds";
+    public static final String IN_IDS = "inIds";
+    public static final String BOTH_IDS = "bothIds";
+
+    /**
+     * Helper class for query arguments.
+     */
+    public static final class VertexQueryArguments {
+
+        private final Direction queryDirection;
+        private final ReturnType returnType;
+        private final boolean countOnly;
+
+        public VertexQueryArguments(String directionSegment) {
+            if (directionSegment.equals(OUT_E)) {
+                returnType = ReturnType.EDGES;
+                queryDirection = Direction.OUT;
+                countOnly = false;
+            } else if (directionSegment.equals(IN_E)) {
+                returnType = ReturnType.EDGES;
+                queryDirection = Direction.IN;
+                countOnly = false;
+            } else if (directionSegment.equals(BOTH_E)) {
+                returnType = ReturnType.EDGES;
+                queryDirection = Direction.BOTH;
+                countOnly = false;
+            } else if (directionSegment.equals(OUT)) {
+                returnType = ReturnType.VERTICES;
+                queryDirection = Direction.OUT;
+                countOnly = false;
+            } else if (directionSegment.equals(IN)) {
+                returnType = ReturnType.VERTICES;
+                queryDirection = Direction.IN;
+                countOnly = false;
+            } else if (directionSegment.equals(BOTH)) {
+                returnType = ReturnType.VERTICES;
+                queryDirection = Direction.BOTH;
+                countOnly = false;
+            } else if (directionSegment.equals(BOTH_COUNT)) {
+                returnType = ReturnType.COUNT;
+                queryDirection = Direction.BOTH;
+                countOnly = true;
+            } else if (directionSegment.equals(IN_COUNT)) {
+                returnType = ReturnType.COUNT;
+                queryDirection = Direction.IN;
+                countOnly = true;
+            } else if (directionSegment.equals(OUT_COUNT)) {
+                returnType = ReturnType.COUNT;
+                queryDirection = Direction.OUT;
+                countOnly = true;
+            } else if (directionSegment.equals(BOTH_IDS)) {
+                returnType = ReturnType.VERTEX_IDS;
+                queryDirection = Direction.BOTH;
+                countOnly = false;
+            } else if (directionSegment.equals(IN_IDS)) {
+                returnType = ReturnType.VERTEX_IDS;
+                queryDirection = Direction.IN;
+                countOnly = false;
+            } else if (directionSegment.equals(OUT_IDS)) {
+                returnType = ReturnType.VERTEX_IDS;
+                queryDirection = Direction.OUT;
+                countOnly = false;
+            } else {
+                throw new WebApplicationException(Response.status(Response.Status.BAD_REQUEST)
+                        .entity(JSONObject.quote(directionSegment + " segment was invalid."))
+                        .build());
+            }
+        }
+
+        public Direction getQueryDirection() {
+            return queryDirection;
+        }
+
+        public ReturnType getReturnType() {
+            return returnType;
+        }
+
+        public boolean isCountOnly() {
+            return countOnly;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/prism/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/webapp/WEB-INF/web.xml b/prism/src/main/webapp/WEB-INF/web.xml
index 00a6c42..e7c6234 100644
--- a/prism/src/main/webapp/WEB-INF/web.xml
+++ b/prism/src/main/webapp/WEB-INF/web.xml
@@ -49,7 +49,7 @@
         <init-param>
             <param-name>com.sun.jersey.config.property.packages</param-name>
             <param-value>
-                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy
+                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/process/pom.xml
----------------------------------------------------------------------
diff --git a/process/pom.xml b/process/pom.xml
index 0ca554a..46f9cf0 100644
--- a/process/pom.xml
+++ b/process/pom.xml
@@ -106,16 +106,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-jms</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-beans</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/webapp/src/main/webapp/WEB-INF/embedded/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/embedded/web.xml b/webapp/src/main/webapp/WEB-INF/embedded/web.xml
index bc74d91..9dc371f 100644
--- a/webapp/src/main/webapp/WEB-INF/embedded/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/embedded/web.xml
@@ -49,7 +49,7 @@
         <init-param>
             <param-name>com.sun.jersey.config.property.packages</param-name>
             <param-value>
-                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy
+                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/webapp/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml
index 3864789..971fcdd 100644
--- a/webapp/src/main/webapp/WEB-INF/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/web.xml
@@ -49,7 +49,7 @@
         <init-param>
             <param-name>com.sun.jersey.config.property.packages</param-name>
             <param-value>
-                org.apache.falcon.resource.admin,org.apache.falcon.resource.proxy,org.apache.falcon.resource.provider
+                org.apache.falcon.resource.admin,org.apache.falcon.resource.proxy,org.apache.falcon.resource.provider,org.apache.falcon.resource.metadata
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
index f946202..f4f72e2 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.cli;
 
+import org.apache.falcon.client.FalconClient;
 import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.util.OozieTestUtils;
 import org.apache.falcon.util.StartupProperties;
@@ -57,6 +58,15 @@ public class FalconCLISmokeIT {
 
         // this is necessary for lineage
         Assert.assertEquals(0, executeWithURL("entity -submit -type cluster -file " + filePath));
+        // verify
+        Assert.assertEquals(0, executeWithURL("graph -vertices -all"));
+        Assert.assertEquals(0, executeWithURL("graph -edges -all"));
+        Assert.assertEquals(0, executeWithURL("graph -vertices -name name -value " + context.getClusterName()));
+
+        // verify lineage
+        FalconClient client = new FalconClient(TestContext.BASE_URL);
+        String clusterVertex = client.getVertices("name", context.getClusterName());
+        Assert.assertTrue(clusterVertex.contains(context.getClusterName()));
 
         filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1,
overlay);
         Assert.assertEquals(0,


Mime
View raw message