Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E50B210905 for ; Thu, 6 Mar 2014 00:50:11 +0000 (UTC) Received: (qmail 21092 invoked by uid 500); 6 Mar 2014 00:50:11 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 21064 invoked by uid 500); 6 Mar 2014 00:50:11 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 21054 invoked by uid 99); 6 Mar 2014 00:50:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Mar 2014 00:50:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 06 Mar 2014 00:50:01 +0000 Received: (qmail 20616 invoked by uid 99); 6 Mar 2014 00:49:37 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Mar 2014 00:49:37 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 79FC2938150; Thu, 6 Mar 2014 00:49:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venkatesh@apache.org To: commits@falcon.incubator.apache.org Date: Thu, 06 Mar 2014 00:49:42 -0000 Message-Id: <167b7ad97471466abfcc255f969e0420@git.apache.org> In-Reply-To: <03847b92861e4caca38f8a70aba9607e@git.apache.org> References: <03847b92861e4caca38f8a70aba9607e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/6] git commit: FALCON-289 Provide REST APIs for discovering lineage metadata over the store. Contributed by Venkatesh Seetharam X-Virus-Checked: Checked by ClamAV on apache.org FALCON-289 Provide REST APIs for discovering lineage metadata over the store. Contributed by Venkatesh Seetharam Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b8b2fe45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b8b2fe45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b8b2fe45 Branch: refs/heads/master Commit: b8b2fe45dc480223af1bc70ed13e70ab82b60053 Parents: ecb919c Author: Venkatesh Seetharam Authored: Wed Mar 5 16:47:37 2014 -0800 Committer: Venkatesh Seetharam Committed: Wed Mar 5 16:47:37 2014 -0800 ---------------------------------------------------------------------- CHANGES.txt | 5 + .../java/org/apache/falcon/cli/FalconCLI.java | 123 ++++++ .../org/apache/falcon/client/FalconClient.java | 74 ++++ .../metadata/LineageMetadataResource.java | 433 +++++++++++++++++++ prism/src/main/webapp/WEB-INF/web.xml | 2 +- process/pom.xml | 10 - webapp/src/main/webapp/WEB-INF/embedded/web.xml | 2 +- webapp/src/main/webapp/WEB-INF/web.xml | 2 +- .../org/apache/falcon/cli/FalconCLISmokeIT.java | 10 + 9 files changed, 648 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 931a868..fdb9490 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,11 @@ Trunk (Unreleased) FALCON-254 Bootstrap designer module. (Srikanth Sundarrajan via Shwetha GS) FALCON-238 Support updates at specific time. (Shwetha GS) + + FALCON-285 Support Lineage information capture (Venkatesh Seetharam) + + FALCON-289 Provide REST APIs for discovering lineage metadata over the store. + (Venkatesh Seetharam) IMPROVEMENTS FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index a414e32..1663073 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -89,6 +89,20 @@ public class FalconCLI { public static final String CURRENT_COLO = "current.colo"; public static final String CLIENT_PROPERTIES = "/client.properties"; + // Graph Commands + public static final String GRAPH_CMD = "graph"; + public static final String VERTEX_CMD = "vertex"; + public static final String VERTICES_CMD = "vertices"; + public static final String VERTEX_EDGES_CMD = "edges"; + + // Graph Command Options + public static final String EDGE_CMD = "edge"; + public static final String ID_OPT = "id"; + public static final String NAME_OPT = "name"; + public static final String VALUE_OPT = "value"; + public static final String DIRECTION_OPT = "direction"; + public static final String DUMP_OPT = "all"; + /** * Entry point for the Falcon CLI when invoked from the command line. Upon * completion this method exits the JVM with '0' (success) or '-1' @@ -132,6 +146,7 @@ public class FalconCLI { "", "Process instances operations like running, status, kill, suspend, resume, rerun, logs", instanceOptions(), false); + parser.addCommand(GRAPH_CMD, "", "graph operations", createGraphOptions(), true); try { CLIParser.Command command = parser.parse(args); @@ -149,6 +164,8 @@ public class FalconCLI { entityCommand(commandLine, client); } else if (command.getName().equals(INSTANCE_CMD)) { instanceCommand(commandLine, client); + } else if (command.getName().equals(GRAPH_CMD)) { + graphCommand(commandLine, client); } } @@ -572,6 +589,112 @@ public class FalconCLI { return instanceOptions; } + private Options createGraphOptions() { + Options graphOptions = new Options(); + Option url = new Option(URL_OPTION, true, "Falcon URL"); + graphOptions.addOption(url); + + Option vertex = new Option(VERTEX_CMD, false, "show the vertices"); + Option vertices = new Option(VERTICES_CMD, false, "show the vertices"); + Option vertexEdges = new Option(VERTEX_EDGES_CMD, false, "show the edges for a given vertex"); + Option edges = new Option(EDGE_CMD, false, "show the edges"); + + OptionGroup group = new OptionGroup(); + group.addOption(vertex); + group.addOption(vertices); + group.addOption(vertexEdges); + group.addOption(edges); + graphOptions.addOptionGroup(group); + + Option id = new Option(ID_OPT, true, "vertex or edge id"); + graphOptions.addOption(id); + + Option name = new Option(NAME_OPT, true, "name property"); + graphOptions.addOption(name); + + Option value = new Option(VALUE_OPT, true, "value property"); + graphOptions.addOption(value); + + Option direction = new Option(DIRECTION_OPT, true, "edge direction property"); + graphOptions.addOption(direction); + + Option dump = new Option(DUMP_OPT, false, "dump all elements"); + graphOptions.addOption(dump); + + return graphOptions; + } + + private void graphCommand(CommandLine commandLine, + FalconClient client) throws FalconCLIException { + Set optionsList = new HashSet(); + for (Option option : commandLine.getOptions()) { + optionsList.add(option.getOpt()); + } + + String result; + String id = commandLine.getOptionValue(ID_OPT); + String name = commandLine.getOptionValue(NAME_OPT); + String value = commandLine.getOptionValue(VALUE_OPT); + String direction = commandLine.getOptionValue(DIRECTION_OPT); + String dump = commandLine.getOptionValue(DUMP_OPT); + + if (optionsList.contains(VERTEX_CMD)) { + validateId(id); + result = client.getVertex(id); + } else if (optionsList.contains(VERTICES_CMD)) { + if (isDump(dump)) { + result = client.getVertices(); + } else { + validateVerticesCommand(name, value); + result = client.getVertices(name, value); + } + } else if (optionsList.contains(VERTEX_EDGES_CMD)) { + if (isDump(dump)) { + result = client.getEdges(); + } else { + validateVertexEdgesCommand(id, direction); + result = client.getVertexEdges(id, direction); + } + } else if (optionsList.contains(EDGE_CMD)) { + validateId(id); + result = client.getEdge(id); + } else { + throw new FalconCLIException("Invalid command"); + } + + OUT.get().println(result); + } + + private void validateId(String id) throws FalconCLIException { + if (id == null || id.length() == 0) { + throw new FalconCLIException("Missing argument: id"); + } + } + + private boolean isDump(String dump) { + return dump != null; + } + + private void validateVerticesCommand(String name, String value) throws FalconCLIException { + if (name == null || name.length() == 0) { + throw new FalconCLIException("Missing argument: name"); + } + + if (value == null || value.length() == 0) { + throw new FalconCLIException("Missing argument: value"); + } + } + + private void validateVertexEdgesCommand(String id, String direction) throws FalconCLIException { + if (id == null || id.length() == 0) { + throw new FalconCLIException("Missing argument: id"); + } + + if (direction == null || direction.length() == 0) { + throw new FalconCLIException("Missing argument: direction"); + } + } + protected String getFalconEndpoint(CommandLine commandLine) throws FalconCLIException, IOException { String url = commandLine.getOptionValue(URL_OPTION); if (url == null) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 59b2a3b..f008953 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -736,6 +736,80 @@ public class FalconClient { return sb.toString(); } + protected static enum GraphOperations { + + VERTICES("api/graphs/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON), + EDGES("api/graphs/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON); + + private String path; + private String method; + private String mimeType; + + GraphOperations(String path, String method, String mimeType) { + this.path = path; + this.method = method; + this.mimeType = mimeType; + } + } + + public String getVertex(String id) throws FalconCLIException { + return sendGraphRequest(GraphOperations.VERTICES, id); + } + + public String getVertices(String key, String value) throws FalconCLIException { + return sendGraphRequest(GraphOperations.VERTICES, key, value); + } + + public String getVertexEdges(String id, String direction) throws FalconCLIException { + return sendGraphRequestForEdges(GraphOperations.VERTICES, id, direction); + } + + public String getEdge(String id) throws FalconCLIException { + return sendGraphRequest(GraphOperations.EDGES, id); + } + + public String getVertices() throws FalconCLIException { + return sendGraphRequest(GraphOperations.VERTICES, "all"); + } + + public String getEdges() throws FalconCLIException { + return sendGraphRequest(GraphOperations.EDGES, "all"); + } + + private String sendGraphRequest(GraphOperations job, String id) throws FalconCLIException { + ClientResponse clientResponse = service.path(job.path) + .path(id) + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(job.mimeType) + .type(job.mimeType) + .method(job.method, ClientResponse.class); + return parseStringResult(clientResponse); + } + + private String sendGraphRequest(GraphOperations job, String name, + String value) throws FalconCLIException { + ClientResponse clientResponse = service.path(job.path) + .queryParam("name", name) + .queryParam("value", value) + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(job.mimeType) + .type(job.mimeType) + .method(job.method, ClientResponse.class); + return parseStringResult(clientResponse); + } + + private String sendGraphRequestForEdges(GraphOperations job, String id, + String direction) throws FalconCLIException { + ClientResponse clientResponse = service.path(job.path) + .path(id) + .path(direction) + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(job.mimeType) + .type(job.mimeType) + .method(job.method, ClientResponse.class); + return parseStringResult(clientResponse); + } + private void checkIfSuccessful(ClientResponse clientResponse) throws FalconCLIException { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java new file mode 100644 index 0000000..f140bca --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource.metadata; + +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.KeyIndexableGraph; +import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.VertexQuery; +import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; +import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility; +import org.apache.falcon.metadata.GraphUtils; +import org.apache.falcon.metadata.MetadataMappingService; +import org.apache.falcon.service.Services; +import org.apache.falcon.util.StartupProperties; +import org.apache.log4j.Logger; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.Set; + +/** + * Jersey Resource for lineage metadata operations. + */ +@Path("graphs/lineage") +public class LineageMetadataResource { + + private static final Logger LOG = Logger.getLogger(LineageMetadataResource.class); + + public static final String RESULTS = "results"; + public static final String TOTAL_SIZE = "totalSize"; + + private final MetadataMappingService service; + + public LineageMetadataResource() { + if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) { + service = Services.get().getService(MetadataMappingService.SERVICE_NAME); + } else { + service = null; + } + } + + private KeyIndexableGraph getGraph() { + return service.getGraph(); + } + + private Set getVertexIndexedKeys() { + return service.getVertexIndexedKeys(); + } + + private Set getEdgeIndexedKeys() { + return service.getEdgeIndexedKeys(); + } + + /** + * Dump the graph. + * + * GET http://host/graphs/lineage/serialize + * graph.getVertices(); + */ + @GET + @Path("/serialize") + @Produces({MediaType.APPLICATION_JSON}) + public Response serializeGraph() { + checkIfMetadataMappingServiceIsEnabled(); + String file = StartupProperties.get().getProperty("falcon.graph.serialize.path") + + "lineage-graph-" + System.currentTimeMillis() + ".json"; + LOG.info("Serialize Graph to: " + file); + try { + GraphUtils.dump(getGraph(), file); + return Response.ok().build(); + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build()); + } + } + + /** + * Get all vertices. + * + * GET http://host/graphs/lineage/vertices/all + * graph.getVertices(); + */ + @GET + @Path("/vertices/all") + @Produces({MediaType.APPLICATION_JSON}) + public Response getVertices() { + checkIfMetadataMappingServiceIsEnabled(); + LOG.info("Get All Vertices"); + try { + JSONArray vertexArray = new JSONArray(); + long counter = 0; + for (Vertex vertex : getGraph().getVertices()) { + counter++; + vertexArray.put(GraphSONUtility.jsonFromElement( + vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL)); + } + + JSONObject response = new JSONObject(); + response.put(RESULTS, vertexArray); + response.put(TOTAL_SIZE, counter); + return Response.ok(response).build(); + } catch (JSONException e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build()); + } + } + + /** + * Get a single vertex with a unique id. + * + * GET http://host/graphs/lineage/vertices/id + * graph.getVertex(id); + */ + @GET + @Path("/vertices/{id}") + @Produces({MediaType.APPLICATION_JSON}) + public Response getVertex(@PathParam("id") final String vertexId) { + checkIfMetadataMappingServiceIsEnabled(); + LOG.info("Get vertex for vertexId= " + vertexId); + try { + Vertex vertex = getGraph().getVertex(vertexId); + if (vertex == null) { + String message = "Vertex with [" + vertexId + "] cannot be found."; + LOG.info(message); + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(JSONObject.quote(message)).build()); + } + + JSONObject response = new JSONObject(); + response.put(RESULTS, GraphSONUtility.jsonFromElement( + vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL)); + return Response.ok(response).build(); + } catch (JSONException e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build()); + } + } + + /** + * Get a list of vertices matching a property name and a value. + *

+ * GET http://host/graphs/lineage/vertices?name=&value= + * graph.getVertices(name, value); + */ + @GET + @Path("/vertices") + @Produces({MediaType.APPLICATION_JSON}) + public Response getVertices(@QueryParam("name") final String name, + @QueryParam("value") final String value) { + checkIfMetadataMappingServiceIsEnabled(); + LOG.info("Get vertices for property name= " + name + ", value= " + value); + try { + Iterable vertices = getGraph().getVertices(name, value); + final JSONArray vertexArray = new JSONArray(); + + long counter = 0; + for (Vertex vertex : vertices) { + counter++; + vertexArray.put(GraphSONUtility.jsonFromElement( + vertex, getVertexIndexedKeys(), GraphSONMode.NORMAL)); + } + + JSONObject response = new JSONObject(); + response.put(RESULTS, vertexArray); + response.put(TOTAL_SIZE, counter); + return Response.ok(response).build(); + } catch (JSONException e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build()); + } + } + + /** + * Get a list of adjacent edges with a direction. + * + * GET http://host/graphs/lineage/vertices/id/direction + * graph.getVertex(id).get{Direction}Edges(); + * direction: {(?!outE)(?!bothE)(?!inE)(?!out)(?!both)(?!in)(?!query).+} + */ + @GET + @Path("vertices/{id}/{direction}") + @Produces({MediaType.APPLICATION_JSON}) + public Response getVertexEdges(@PathParam("id") String vertexId, + @PathParam("direction") String direction) { + checkIfMetadataMappingServiceIsEnabled(); + LOG.info("Get vertex edges for vertexId= " + vertexId + ", direction= " + direction); + try { + Vertex vertex = getGraph().getVertex(vertexId); + if (vertex == null) { + String message = "Vertex with [" + vertexId + "] cannot be found."; + LOG.info(message); + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(JSONObject.quote(message)).build()); + } + + // break out the segment into the return and the direction + VertexQueryArguments queryArguments = new VertexQueryArguments(direction); + // if this is a query and the _return is "count" then we don't bother to send back the result array + boolean countOnly = queryArguments.isCountOnly(); + // what kind of data the calling client wants back (vertices, edges, count, vertex identifiers) + ReturnType returnType = queryArguments.getReturnType(); + // the query direction (both, out, in) + Direction queryDirection = queryArguments.getQueryDirection(); + + VertexQuery query = vertex.query().direction(queryDirection); + + JSONArray elementArray = new JSONArray(); + long counter = 0; + if (returnType == ReturnType.VERTICES || returnType == ReturnType.VERTEX_IDS) { + Iterable vertexQueryResults = query.vertices(); + for (Vertex v : vertexQueryResults) { + if (returnType.equals(ReturnType.VERTICES)) { + elementArray.put(GraphSONUtility.jsonFromElement( + v, getVertexIndexedKeys(), GraphSONMode.NORMAL)); + } else { + elementArray.put(v.getId()); + } + counter++; + } + } else if (returnType == ReturnType.EDGES) { + Iterable edgeQueryResults = query.edges(); + for (Edge e : edgeQueryResults) { + elementArray.put(GraphSONUtility.jsonFromElement( + e, getEdgeIndexedKeys(), GraphSONMode.NORMAL)); + counter++; + } + } else if (returnType == ReturnType.COUNT) { + counter = query.count(); + } + + JSONObject response = new JSONObject(); + if (!countOnly) { + response.put(RESULTS, elementArray); + } + response.put(TOTAL_SIZE, counter); + return Response.ok(response).build(); + } catch (JSONException e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build()); + } + } + + /** + * Get all edges. + * + * GET http://host/graphs/lineage/edges/all + * graph.getEdges(); + */ + @GET + @Path("/edges/all") + @Produces({MediaType.APPLICATION_JSON}) + public Response getEdges() { + checkIfMetadataMappingServiceIsEnabled(); + LOG.info("Get All Edges."); + try { + JSONArray vertexArray = new JSONArray(); + long counter = 0; + for (Edge edge : getGraph().getEdges()) { + counter++; + vertexArray.put(GraphSONUtility.jsonFromElement( + edge, getEdgeIndexedKeys(), GraphSONMode.NORMAL)); + } + + JSONObject response = new JSONObject(); + response.put(RESULTS, vertexArray); + response.put(TOTAL_SIZE, counter); + return Response.ok(response).build(); + } catch (JSONException e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build()); + } + } + + /** + * Get a single edge with a unique id. + * + * GET http://host/graphs/lineage/edges/id + * graph.getEdge(id); + */ + @GET + @Path("/edges/{id}") + @Produces({MediaType.APPLICATION_JSON}) + public Response getEdge(@PathParam("id") final String edgeId) { + checkIfMetadataMappingServiceIsEnabled(); + LOG.info("Get vertex for edgeId= " + edgeId); + try { + Edge edge = getGraph().getEdge(edgeId); + if (edge == null) { + String message = "Edge with [" + edgeId + "] cannot be found."; + LOG.info(message); + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(JSONObject.quote(message)).build()); + } + + JSONObject response = new JSONObject(); + response.put(RESULTS, GraphSONUtility.jsonFromElement( + edge, getEdgeIndexedKeys(), GraphSONMode.NORMAL)); + return Response.ok(response).build(); + } catch (JSONException e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build()); + } + } + + private void checkIfMetadataMappingServiceIsEnabled() { + if (service == null) { + throw new WebApplicationException( + Response.status(Response.Status.NOT_FOUND) + .entity("Lineage Metadata Service is not enabled.") + .type("text/plain") + .build()); + } + } + + private enum ReturnType {VERTICES, EDGES, COUNT, VERTEX_IDS} + + public static final String OUT_E = "outE"; + public static final String IN_E = "inE"; + public static final String BOTH_E = "bothE"; + public static final String OUT = "out"; + public static final String IN = "in"; + public static final String BOTH = "both"; + public static final String OUT_COUNT = "outCount"; + public static final String IN_COUNT = "inCount"; + public static final String BOTH_COUNT = "bothCount"; + public static final String OUT_IDS = "outIds"; + public static final String IN_IDS = "inIds"; + public static final String BOTH_IDS = "bothIds"; + + /** + * Helper class for query arguments. + */ + public static final class VertexQueryArguments { + + private final Direction queryDirection; + private final ReturnType returnType; + private final boolean countOnly; + + public VertexQueryArguments(String directionSegment) { + if (directionSegment.equals(OUT_E)) { + returnType = ReturnType.EDGES; + queryDirection = Direction.OUT; + countOnly = false; + } else if (directionSegment.equals(IN_E)) { + returnType = ReturnType.EDGES; + queryDirection = Direction.IN; + countOnly = false; + } else if (directionSegment.equals(BOTH_E)) { + returnType = ReturnType.EDGES; + queryDirection = Direction.BOTH; + countOnly = false; + } else if (directionSegment.equals(OUT)) { + returnType = ReturnType.VERTICES; + queryDirection = Direction.OUT; + countOnly = false; + } else if (directionSegment.equals(IN)) { + returnType = ReturnType.VERTICES; + queryDirection = Direction.IN; + countOnly = false; + } else if (directionSegment.equals(BOTH)) { + returnType = ReturnType.VERTICES; + queryDirection = Direction.BOTH; + countOnly = false; + } else if (directionSegment.equals(BOTH_COUNT)) { + returnType = ReturnType.COUNT; + queryDirection = Direction.BOTH; + countOnly = true; + } else if (directionSegment.equals(IN_COUNT)) { + returnType = ReturnType.COUNT; + queryDirection = Direction.IN; + countOnly = true; + } else if (directionSegment.equals(OUT_COUNT)) { + returnType = ReturnType.COUNT; + queryDirection = Direction.OUT; + countOnly = true; + } else if (directionSegment.equals(BOTH_IDS)) { + returnType = ReturnType.VERTEX_IDS; + queryDirection = Direction.BOTH; + countOnly = false; + } else if (directionSegment.equals(IN_IDS)) { + returnType = ReturnType.VERTEX_IDS; + queryDirection = Direction.IN; + countOnly = false; + } else if (directionSegment.equals(OUT_IDS)) { + returnType = ReturnType.VERTEX_IDS; + queryDirection = Direction.OUT; + countOnly = false; + } else { + throw new WebApplicationException(Response.status(Response.Status.BAD_REQUEST) + .entity(JSONObject.quote(directionSegment + " segment was invalid.")) + .build()); + } + } + + public Direction getQueryDirection() { + return queryDirection; + } + + public ReturnType getReturnType() { + return returnType; + } + + public boolean isCountOnly() { + return countOnly; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/prism/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/prism/src/main/webapp/WEB-INF/web.xml b/prism/src/main/webapp/WEB-INF/web.xml index 00a6c42..e7c6234 100644 --- a/prism/src/main/webapp/WEB-INF/web.xml +++ b/prism/src/main/webapp/WEB-INF/web.xml @@ -49,7 +49,7 @@ com.sun.jersey.config.property.packages - org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy + org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata 1 http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/process/pom.xml ---------------------------------------------------------------------- diff --git a/process/pom.xml b/process/pom.xml index 0ca554a..46f9cf0 100644 --- a/process/pom.xml +++ b/process/pom.xml @@ -106,16 +106,6 @@ - org.springframework - spring-jms - - - - org.springframework - spring-beans - - - org.testng testng http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/webapp/src/main/webapp/WEB-INF/embedded/web.xml ---------------------------------------------------------------------- diff --git a/webapp/src/main/webapp/WEB-INF/embedded/web.xml b/webapp/src/main/webapp/WEB-INF/embedded/web.xml index bc74d91..9dc371f 100644 --- a/webapp/src/main/webapp/WEB-INF/embedded/web.xml +++ b/webapp/src/main/webapp/WEB-INF/embedded/web.xml @@ -49,7 +49,7 @@ com.sun.jersey.config.property.packages - org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy + org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata 1 http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/webapp/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml index 3864789..971fcdd 100644 --- a/webapp/src/main/webapp/WEB-INF/web.xml +++ b/webapp/src/main/webapp/WEB-INF/web.xml @@ -49,7 +49,7 @@ com.sun.jersey.config.property.packages - org.apache.falcon.resource.admin,org.apache.falcon.resource.proxy,org.apache.falcon.resource.provider + org.apache.falcon.resource.admin,org.apache.falcon.resource.proxy,org.apache.falcon.resource.provider,org.apache.falcon.resource.metadata 1 http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b8b2fe45/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java index f946202..f4f72e2 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java @@ -18,6 +18,7 @@ package org.apache.falcon.cli; +import org.apache.falcon.client.FalconClient; import org.apache.falcon.resource.TestContext; import org.apache.falcon.util.OozieTestUtils; import org.apache.falcon.util.StartupProperties; @@ -57,6 +58,15 @@ public class FalconCLISmokeIT { // this is necessary for lineage Assert.assertEquals(0, executeWithURL("entity -submit -type cluster -file " + filePath)); + // verify + Assert.assertEquals(0, executeWithURL("graph -vertices -all")); + Assert.assertEquals(0, executeWithURL("graph -edges -all")); + Assert.assertEquals(0, executeWithURL("graph -vertices -name name -value " + context.getClusterName())); + + // verify lineage + FalconClient client = new FalconClient(TestContext.BASE_URL); + String clusterVertex = client.getVertices("name", context.getClusterName()); + Assert.assertTrue(clusterVertex.contains(context.getClusterName())); filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay); Assert.assertEquals(0,