falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [2/3] incubator-falcon git commit: FALCON-256 Create new API for Process dependency graph DAG which captures process connected via feeds. Contributed by Ajay Yadav
Date Fri, 26 Dec 2014 05:32:12 GMT
FALCON-256 Create new API for Process dependency graph DAG which captures process connected
via feeds. Contributed by Ajay Yadav


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/45a7b989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/45a7b989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/45a7b989

Branch: refs/heads/master
Commit: 45a7b989bfdad6943dc0090c7cea2e098862c9a9
Parents: 06ffdf9
Author: srikanth.sundarrajan <sriksun@apache.org>
Authored: Fri Dec 26 10:35:32 2014 +0530
Committer: srikanth.sundarrajan <sriksun@apache.org>
Committed: Fri Dec 26 10:35:32 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/falcon/cli/FalconMetadataCLI.java    |  17 +-
 .../org/apache/falcon/client/FalconClient.java  |  17 +-
 .../falcon/resource/LineageGraphResult.java     | 165 +++++++++++++++++++
 docs/src/site/twiki/FalconCLI.twiki             |  17 ++
 docs/src/site/twiki/restapi/EntityLineage.twiki |  39 +++++
 docs/src/site/twiki/restapi/ResourceList.twiki  |   1 +
 pom.xml                                         |   6 +
 prism/pom.xml                                   |   7 +
 .../metadata/LineageMetadataResource.java       | 102 ++++++++++++
 .../metadata/LineageMetadataResourceTest.java   |   8 +
 .../resource/metadata/MetadataTestContext.java  |  18 ++
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  24 +++
 13 files changed, 422 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af4bd9e..5575219 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
+   FALCON-256 Create new API for Process dependency graph DAG which captures 
+   process connected via feeds. (Ajay Yadav via Srikanth Sundarrajan)
+
    FALCON-823 Add path matching ability to the radix tree (Ajay Yadav
    via Srikanth Sundarrajan) 
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
index 63af415..515d328 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -54,6 +54,8 @@ public class FalconMetadataCLI {
     public static final String VERTEX_CMD = "vertex";
     public static final String VERTICES_CMD = "vertices";
     public static final String VERTEX_EDGES_CMD = "edges";
+    public static final String PIPELINE_OPT = "pipeline";
+
 
     public static final String EDGE_CMD = "edge";
     public static final String ID_OPT = "id";
@@ -78,8 +80,12 @@ public class FalconMetadataCLI {
         String key = commandLine.getOptionValue(KEY_OPT);
         String value = commandLine.getOptionValue(VALUE_OPT);
         String direction = commandLine.getOptionValue(DIRECTION_OPT);
+        String pipeline = commandLine.getOptionValue(PIPELINE_OPT);
 
-        if (optionsList.contains(LIST_OPT)) {
+        if (optionsList.contains(LINEAGE_OPT)) {
+            validatePipelineName(pipeline);
+            result = client.getEntityLineageGraph(pipeline).getDotNotation();
+        } else if (optionsList.contains(LIST_OPT)) {
             validateDimensionType(dimensionType.toUpperCase());
             result = client.getDimensionList(dimensionType, cluster);
         } else if (optionsList.contains(RELATIONS_OPT)) {
@@ -105,6 +111,12 @@ public class FalconMetadataCLI {
         OUT.get().println(result);
     }
 
+    private void validatePipelineName(String pipeline) throws FalconCLIException {
+        if (StringUtils.isEmpty(pipeline)) {
+            throw new FalconCLIException("Invalid value for pipeline");
+        }
+    }
+
     private void validateDimensionType(String dimensionType) throws FalconCLIException {
         if (StringUtils.isEmpty(dimensionType)
                 ||  dimensionType.contains("INSTANCE")) {
@@ -157,6 +169,8 @@ public class FalconMetadataCLI {
         Option lineage = new Option(LINEAGE_OPT, false, "Get falcon metadata lineage information");
         group.addOption(discovery);
         group.addOption(lineage);
+        Option pipeline = new Option(PIPELINE_OPT, true,
+                "Get lineage graph for the entities in a pipeline");
         metadataOptions.addOptionGroup(group);
 
         // Add discovery options
@@ -172,6 +186,7 @@ public class FalconMetadataCLI {
         Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
 
         // Add lineage options
+        metadataOptions.addOption(pipeline);
 
         metadataOptions.addOption(url);
         metadataOptions.addOption(type);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 5c476ae..a748c58 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -38,6 +38,7 @@ import org.apache.falcon.resource.EntitySummaryResult;
 import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.LineageGraphResult;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
@@ -210,7 +211,8 @@ public class FalconClient {
         LIST("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         RELATIONS("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         VERTICES("api/metadata/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON),
-        EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON);
+        EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        LINEAGE("api/metadata/lineage/entities", HttpMethod.GET, MediaType.APPLICATION_JSON);
 
         private String path;
         private String method;
@@ -507,6 +509,19 @@ public class FalconClient {
         return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null,
cluster);
     }
 
+    public LineageGraphResult getEntityLineageGraph(String pipelineName) throws FalconCLIException
{
+        MetadataOperations operation = MetadataOperations.LINEAGE;
+        WebResource resource = service.path(operation.path)
+                .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName);
+
+        ClientResponse clientResponse = resource
+            .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+            .accept(operation.mimeType).type(operation.mimeType)
+            .method(operation.method, ClientResponse.class);
+        checkIfSuccessful(clientResponse);
+        return clientResponse.getEntity(LineageGraphResult.class);
+    }
+
     public String getDimensionRelations(String dimensionType, String dimensionName) throws
FalconCLIException {
         return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType,
dimensionName, null);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
new file mode 100644
index 0000000..acf5d11
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.commons.lang.StringUtils;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * LineageGraphResult is the output returned by all the apis returning a DAG.
+ */
+@XmlRootElement(name = "result")
+@XmlAccessorType (XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class LineageGraphResult {
+
+    private String[] vertices;
+
+    @XmlElement(name="edges")
+    private Edge[] edges;
+
+    private static final JAXBContext JAXB_CONTEXT;
+
+    static {
+        try {
+            JAXB_CONTEXT = JAXBContext.newInstance(LineageGraphResult.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public LineageGraphResult() {
+        // default constructor for JAXB
+    }
+
+    /**
+     * A class to represent an edge in a DAG.
+     */
+    @XmlRootElement(name = "edge")
+    @XmlAccessorType(XmlAccessType.FIELD)
+    public static class Edge {
+        @XmlElement
+        private String from;
+        @XmlElement
+        private String to;
+        @XmlElement
+        private String label;
+
+        public Edge() {
+
+        }
+
+        public Edge(String from, String to, String label) {
+            this.from = from;
+            this.to = to;
+            this.label = label;
+        }
+
+        public String getFrom() {
+            return from;
+        }
+
+        public void setFrom(String from) {
+            this.from = from;
+        }
+
+        public String getTo() {
+            return to;
+        }
+
+        public void setTo(String to) {
+            this.to = to;
+        }
+
+        public String getLabel() {
+            return label;
+        }
+
+        public void setLabel(String label) {
+            this.label = label;
+        }
+
+        public String getDotNotation() {
+            StringBuilder result = new StringBuilder();
+            if (StringUtils.isNotBlank(this.from) && StringUtils.isNotBlank(this.to)
+                    && StringUtils.isNotBlank(this.label)) {
+                result.append("\"" + this.from +"\"");
+                result.append(" -> ");
+                result.append("\"" + this.to + "\"");
+                result.append(" [ label = \"" + this.label + "\" ] \n");
+            }
+            return result.toString();
+        }
+
+        @Override
+        public String toString() {
+            return getDotNotation();
+        }
+
+    }
+
+
+    public String getDotNotation() {
+        StringBuilder result = new StringBuilder();
+        result.append("digraph g{ \n");
+        if (this.vertices != null) {
+            for (String v : this.vertices) {
+                result.append("\"" + v + "\"");
+                result.append("\n");
+            }
+        }
+
+        if (this.edges != null) {
+            for (Edge e : this.edges) {
+                result.append(e.getDotNotation());
+            }
+        }
+        result.append("}\n");
+        return result.toString();
+    }
+
+    public String[] getVertices() {
+        return vertices;
+    }
+
+    public void setVertices(String[] vertices) {
+        this.vertices = vertices;
+    }
+
+    public Edge[] getEdges() {
+        return edges;
+    }
+
+    public void setEdges(Edge[] edges) {
+        this.edges = edges;
+    }
+
+
+    @Override
+    public String toString() {
+        return getDotNotation();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index d8199dd..d37cf8c 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -56,6 +56,9 @@ Optional Args : -fields <<field1,field2>> -filterBy <<field1:value1,field2:value
 
 <a href="./Restapi/EntityList.html">Optional params described here.</a>
 
+
+
+
 ---+++Summary
 
 Summary of entities of a particular type and a cluster will be listed. Entity summary has
N most recent instances of entity.
@@ -255,6 +258,20 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name
<<name>> -params -
 
 ---++ Metadata Lineage Options
 
+---+++Lineage
+
+Returns the relationship between processes and feeds in a given pipeline in <a href="http://www.graphviz.org/content/dot-language">dot</a>
format.
+You can use the output and view a graphical representation of DAG using an online graphviz
viewer like <a href="http://graphviz-dev.appspot.com/">this</a>.
+
+
+Usage:
+
+$FALCON_HOME/bin/falcon metadata -lineage -pipeline my-pipeline
+
+pipeline is a mandatory option.
+
+
+
 ---+++ Vertex
 
 Get the vertex with the specified id.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/EntityLineage.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityLineage.twiki b/docs/src/site/twiki/restapi/EntityLineage.twiki
new file mode 100644
index 0000000..ea747b1
--- /dev/null
+++ b/docs/src/site/twiki/restapi/EntityLineage.twiki
@@ -0,0 +1,39 @@
+---++  GET api/metadata/lineage/entities?pipeline=:pipeline
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+It returns the graph depicting the relationship between the various processes and feeds in
a given pipeline.
+
+---++ Parameters
+   * :pipeline is the name of the pipeline
+
+---++ Results
+It returns a json graph
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/metadata/lineage/entities?pipeline=my-pipeline
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "vertices": ["my-minutely-process", "my-hourly-process"],
+    "edges":
+    [
+        {
+         "from"  : "my-minutely-process",
+         "to"    : "my-hourly-process",
+         "label" : "my-minutely-feed"
+        },
+        {
+         "from"  : "my-hourly-process",
+         "to"    : "my-minutely-process",
+         "label" : "my-hourly-feedback"
+        }
+    ]
+}
+</verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index a87818b..2368631 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -77,6 +77,7 @@ See also: [[../Security.twiki][Security in Falcon]]
 | GET         | [[AdjacentVertices][api/metadata/lineage/vertices/:id/:direction]]      
              | get the adjacent vertices or edges of the vertex with the specified direction
|
 | GET         | [[AllEdges][api/metadata/lineage/edges/all]]                            
              | get all edges                                                            
    |
 | GET         | [[Edge][api/metadata/lineage/edges/:id]]                                
              | get the edge with the specified id                                       
    |
+| GET         | [[EntityLineage][api/metadata/lineage/entities?pipeline=:name]]         
              | Get lineage graph for processes and feeds in the specified pipeline      
    |
 
 ---++ REST Call on Metadata Discovery Resource
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5a6c095..1b3a6c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -625,6 +625,12 @@
             </dependency>
 
             <dependency>
+                <groupId>com.tinkerpop.gremlin</groupId>
+                <artifactId>gremlin-java</artifactId>
+                <version>2.6.0</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.springframework</groupId>
                 <artifactId>spring-beans</artifactId>
                 <version>3.0.3.RELEASE</version>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 26e577d..43cc4b4 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -61,6 +61,13 @@
         </dependency>
 
         <dependency>
+            <groupId>com.tinkerpop.gremlin</groupId>
+            <artifactId>gremlin-java</artifactId>
+        </dependency>
+
+
+
+        <dependency>
             <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-test-util</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
index 2404be4..0c6b2b6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.resource.metadata;
 
+import com.google.common.collect.Sets;
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Element;
@@ -25,12 +26,22 @@ import com.tinkerpop.blueprints.Vertex;
 import com.tinkerpop.blueprints.VertexQuery;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
+import com.tinkerpop.gremlin.java.GremlinPipeline;
 import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.metadata.GraphUtils;
 import org.apache.falcon.metadata.RelationshipLabel;
 import org.apache.falcon.metadata.RelationshipProperty;
 import org.apache.falcon.metadata.RelationshipType;
+import org.apache.falcon.monitors.Dimension;
+import org.apache.falcon.monitors.Monitored;
+import org.apache.falcon.resource.LineageGraphResult;
 import org.apache.falcon.util.StartupProperties;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
@@ -48,7 +59,10 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Jersey Resource for lineage metadata operations.
@@ -81,6 +95,37 @@ public class LineageMetadataResource extends AbstractMetadataResource {
         }
     }
 
+
+    @GET
+    @Path("/entities")
+    @Produces({MediaType.APPLICATION_JSON})
+    @Monitored(event = "entity-lineage")
+    public Response getEntityLineageGraph(@Dimension("pipeline") @QueryParam("pipeline")
final String pipeline) {
+        LOG.info("Get lineage Graph for pipeline:({})", pipeline);
+
+        try {
+            Iterable<Vertex> processes;
+            if (StringUtils.isNotBlank(pipeline)) {
+                Iterable<Vertex> pipelineNode = getGraph().getVertices(RelationshipProperty.NAME.getName(),
+                        pipeline);
+                if (!pipelineNode.iterator().hasNext()) {
+                    throw FalconWebException.newException("No pipelines found for " + pipeline,
+                            Response.Status.BAD_REQUEST);
+                }
+                Vertex v = pipelineNode.iterator().next(); // pipeline names are unique
+                processes = new GremlinPipeline(v).in(RelationshipLabel.PIPELINES.getName())
+                        .has(RelationshipProperty.TYPE.getName(), RelationshipType.PROCESS_ENTITY.getName());
+                return Response.ok(buildJSONGraph(processes)).build();
+            }
+            throw FalconWebException.newException("Pipeline name can not be blank",
+                    Response.Status.INTERNAL_SERVER_ERROR);
+
+        } catch (Exception e) {
+            LOG.error("Error while fetching entity lineage: ", e);
+            throw FalconWebException.newException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
     /**
      * Get all vertices.
      *
@@ -392,6 +437,63 @@ public class LineageMetadataResource extends AbstractMetadataResource
{
         return response;
     }
 
+    private LineageGraphResult buildJSONGraph(Iterable<Vertex> processes) throws  FalconException
{
+        LineageGraphResult result = new LineageGraphResult();
+
+        List<String> vertexArray = new LinkedList<String>();
+        List<LineageGraphResult.Edge> edgeArray = new LinkedList<LineageGraphResult.Edge>();
+
+        Map<String, String> feedProducerMap = new HashMap<String, String>();
+        Map<String, List<String>> feedConsumerMap = new HashMap<String, List<String>>();
+
+        if (processes != null) {
+            for (Vertex process : processes) {
+                String processName = process.getProperty(RelationshipProperty.NAME.getName());
+                vertexArray.add(processName);
+                Process producer = ConfigurationStore.get().get(EntityType.PROCESS, processName);
+
+                if (producer != null) {
+                    if (producer.getOutputs() != null) {
+                        //put all produced feeds in feedProducerMap
+                        for (Output output : producer.getOutputs().getOutputs()) {
+                            feedProducerMap.put(output.getFeed(), processName);
+                        }
+                    }
+                    if (producer.getInputs() != null) {
+                        //put all consumed feeds in feedConsumerMap
+                        for (Input input : producer.getInputs().getInputs()) {
+                            //if feed already exists then append it, else insert it with
a list
+                            if (feedConsumerMap.containsKey(input.getFeed())) {
+                                feedConsumerMap.get(input.getFeed()).add(processName);
+                            } else {
+                                List<String> value = new LinkedList<String>();
+                                value.add(processName);
+                                feedConsumerMap.put(input.getFeed(), value);
+                            }
+                        }
+                    }
+                }
+            }
+            LOG.debug("feedProducerMap = {}", feedProducerMap);
+            LOG.debug("feedConsumerMap = {}", feedConsumerMap);
+
+            // discard feeds which aren't edges between two processes
+            Set<String> pipelineFeeds = Sets.intersection(feedProducerMap.keySet(),
feedConsumerMap.keySet());
+            for (String feedName : pipelineFeeds) {
+                String producerProcess = feedProducerMap.get(feedName);
+                // make an edge from producer to all the consumers
+                for (String consumerProcess : feedConsumerMap.get(feedName)) {
+                    edgeArray.add(new LineageGraphResult.Edge(producerProcess, consumerProcess,
feedName));
+                }
+            }
+        }
+
+        result.setEdges(edgeArray.toArray(new LineageGraphResult.Edge[edgeArray.size()]));
+        result.setVertices(vertexArray.toArray(new String[vertexArray.size()]));
+        LOG.debug("result = {}", result);
+        return result;
+    }
+
     private static void validateInputs(String errorMsg, String... inputs) {
         for (String input : inputs) {
             if (StringUtils.isEmpty(input)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index cabb44c..ac0e51f 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -351,6 +351,14 @@ public class LineageMetadataResourceTest {
         }
     }
 
+    @Test
+    public void testEntityLineage() throws Exception {
+        testContext.addConsumerProcess();
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Response response = resource.getEntityLineageGraph("testPipeline");
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+    }
+
     private void assertBasicVertexProperties(Vertex vertex, Map vertexProperties) {
         RelationshipProperty[] properties = {
             RelationshipProperty.NAME,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
index aaddf62..6f798a8 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
@@ -51,6 +51,7 @@ public class MetadataTestContext {
     public static final String OPERATION = "GENERATE";
 
     public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String CHILD_PROCESS_ENTITY_NAME = "sample-child-process";
     public static final String PROCESS_ENTITY_NAME = "sample-process";
     public static final String COLO_NAME = "west-coast";
     public static final String WORKFLOW_NAME = "imp-click-join-workflow";
@@ -171,6 +172,23 @@ public class MetadataTestContext {
         configStore.publish(EntityType.PROCESS, processEntity);
     }
 
+    public void addConsumerProcess() throws Exception {
+        org.apache.falcon.entity.v0.process.Process processEntity =
+                EntityBuilderTestUtil.buildProcess(CHILD_PROCESS_ENTITY_NAME,
+                        clusterEntity, "classified-as=Critical", "testPipeline");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+
+        for (Feed inputFeed : inputFeeds) {
+            EntityBuilderTestUtil.addOutput(processEntity, inputFeed);
+        }
+
+        for (Feed outputFeed : outputFeeds) {
+            EntityBuilderTestUtil.addInput(processEntity, outputFeed);
+        }
+
+        configStore.publish(EntityType.PROCESS, processEntity);
+    }
+
     public void addInstance() throws Exception {
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
                 WorkflowExecutionContext.Type.POST_PROCESSING);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 9c6ad80..b50999d 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -487,6 +487,30 @@ public class FalconCLIIT {
                 + " -file " + createTempJobPropertiesFile()), 0);
     }
 
+
+    @Test
+    public void testEntityLineage() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        String filePath;
+        filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(),
overlay);
+        context.setCluster(overlay.get("cluster"));
+        Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath),
0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1,
overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath),
0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2,
overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath),
0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE,
overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath),
0);
+
+        Assert.assertEquals(executeWithURL("metadata -lineage -pipeline testPipeline"), 0);
+
+    }
+
     @Test
     public void testEntityPaginationFilterByCommands() throws Exception {
 


Mime
View raw message