falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-796 Enable users to triage data processing issues through falcon. Contributed by Ajay Yadava
Date Mon, 29 Jun 2015 11:54:45 GMT
Repository: falcon
Updated Branches:
  refs/heads/master a61349d6e -> 98e12502a


FALCON-796 Enable users to triage data processing issues through falcon. Contributed by Ajay
Yadava


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/98e12502
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/98e12502
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/98e12502

Branch: refs/heads/master
Commit: 98e12502a81e3e20b872431d1fdde06ba77c4880
Parents: a61349d
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Mon Jun 29 17:05:42 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Mon Jun 29 17:05:42 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +-
 .../java/org/apache/falcon/ResponseHelper.java  |  19 +-
 .../java/org/apache/falcon/cli/FalconCLI.java   |  15 +-
 .../org/apache/falcon/client/FalconClient.java  |  15 ++
 .../apache/falcon/resource/TriageResult.java    |  87 ++++++++++
 docs/src/site/twiki/FalconCLI.twiki             |   8 +
 docs/src/site/twiki/restapi/ResourceList.twiki  |   1 +
 docs/src/site/twiki/restapi/Triage.twiki        |  44 +++++
 .../resource/AbstractInstanceManager.java       | 174 +++++++++++++++++++
 .../resource/proxy/InstanceManagerProxy.java    |  20 +++
 .../apache/falcon/resource/InstanceManager.java |  13 ++
 11 files changed, 394 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 87589c1..14658f1 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,8 +4,10 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
-    FALCON-1039 Add instance dependency API in falcon(Ajay Yadava)
+    FALCON-1039 Add instance dependency API in falcon (Ajay Yadava)
 
+    FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
+    
   IMPROVEMENTS
     FALCON-1293 Update CHANGES.txt to change 0.6.1 branch to release (Shaik Idris Ali via
Ajay Yadava)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/ResponseHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java
index 78598ba..ec6604d 100644
--- a/client/src/main/java/org/apache/falcon/ResponseHelper.java
+++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java
@@ -18,16 +18,18 @@
 
 package org.apache.falcon;
 
-import java.util.Date;
-import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.resource.EntitySummaryResult;
 import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.resource.FeedLookupResult;
 import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
-import org.apache.falcon.resource.EntitySummaryResult;
+import org.apache.falcon.resource.TriageResult;
+
+import java.util.Date;
+import java.util.Map;
 
 /**
  * Helpers for response object to string conversion.
@@ -266,6 +268,17 @@ public final class ResponseHelper {
         return sb.toString();
     }
 
+    public static String getString(TriageResult triageResult) {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(triageResult.toString());
+        sb.append("\nAdditional Information:\n");
+        sb.append("Response: ").append(triageResult.getMessage());
+        sb.append("Request Id: ").append(triageResult.getRequestId());
+
+        return sb.toString();
+    }
+
     public static String getString(FeedLookupResult feedLookupResult) {
         StringBuilder sb = new StringBuilder();
         String results = feedLookupResult.toString();

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index f169917..cc041c0 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -118,6 +118,7 @@ public class FalconCLI {
     public static final String LIFECYCLE_OPT = "lifecycle";
     public static final String PARARMS_OPT = "params";
     public static final String LISTING_OPT = "listing";
+    public static final String TRIAGE_OPT = "triage";
 
     // Recipe Command
     public static final String RECIPE_CMD = "recipe";
@@ -252,8 +253,14 @@ public class FalconCLI {
         validateSortOrder(sortOrder);
         validateInstanceCommands(optionsList, entity, type, colo);
 
-
-        if (optionsList.contains(DEPENDENCY_OPT)) {
+        if (optionsList.contains(TRIAGE_OPT)) {
+            validateNotEmpty(colo, COLO_OPT);
+            validateNotEmpty(start, START_OPT);
+            validateNotEmpty(type, ENTITY_TYPE_OPT);
+            validateEntityTypeForSummary(type);
+            validateNotEmpty(entity, ENTITY_NAME_OPT);
+            result = client.triage(type, entity, start, colo).toString();
+        } else if (optionsList.contains(DEPENDENCY_OPT)) {
             validateNotEmpty(instanceTime, INSTANCE_TIME_OPT);
             InstanceDependencyResult response = client.getInstanceDependencies(type, entity,
instanceTime, colo);
             result = ResponseHelper.getString(response);
@@ -798,6 +805,9 @@ public class FalconCLI {
                 false,
                 "Displays dependent instances for a specified instance.");
 
+        Option triage = new Option(TRIAGE_OPT, false,
+                "Triage a feed or process instance and find the failures in it's lineage.");
+
         OptionGroup group = new OptionGroup();
         group.addOption(running);
         group.addOption(list);
@@ -812,6 +822,7 @@ public class FalconCLI {
         group.addOption(params);
         group.addOption(listing);
         group.addOption(dependency);
+        group.addOption(triage);
 
         Option url = new Option(URL_OPTION, true, "Falcon URL");
         Option start = new Option(START_OPT, true,

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 20c32e4..5df8626 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -41,6 +41,7 @@ import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.resource.LineageGraphResult;
+import org.apache.falcon.resource.TriageResult;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
@@ -244,6 +245,7 @@ public class FalconClient {
         SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         DEPENDENCY("api/instance/dependencies/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        TRIAGE("api/instance/triage/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON);
 
         private String path;
@@ -370,6 +372,19 @@ public class FalconClient {
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
 
+    public TriageResult triage(String entityType, String entityName, String instanceTime,
String colo)
+        throws FalconCLIException {
+        ClientResponse clientResponse = service
+                .path(Instances.TRIAGE.path).path(entityType).path(entityName)
+                .queryParam("start", instanceTime).queryParam("colo", colo)
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(Instances.TRIAGE.mimeType).type(MediaType.TEXT_XML)
+                .method(Instances.TRIAGE.method, ClientResponse.class);
+
+        checkIfSuccessful(clientResponse);
+        return clientResponse.getEntity(TriageResult.class);
+    }
+
     public EntityList getEntityList(String entityType, String fields, String nameSubsequence,
String tagKeywords,
                                     String filterBy, String filterTags, String orderBy, String
sortOrder,
                                     Integer offset, Integer numResults) throws FalconCLIException
{

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/resource/TriageResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/TriageResult.java b/client/src/main/java/org/apache/falcon/resource/TriageResult.java
new file mode 100644
index 0000000..a5ffe74
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/TriageResult.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Resut for instance triage.
+ */
+@XmlRootElement(name = "result")
+@XmlAccessorType(XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class TriageResult extends APIResult {
+
+    @XmlElement(name = "triageGraphs")
+    private LineageGraphResult[] triageGraphs;
+
+    //For JAXB
+    private TriageResult() {
+        super();
+    }
+
+    public TriageResult(Status status, String message) {
+        super(status, message);
+    }
+
+
+
+    public LineageGraphResult[] getTriageGraphs() {
+        return triageGraphs;
+    }
+
+    public void setTriageGraphs(LineageGraphResult[] triageGraphs) {
+        this.triageGraphs = triageGraphs;
+    }
+
+
+    @Override
+    public Object[] getCollection() {
+        return getTriageGraphs();
+    }
+
+
+    @Override
+    public void setCollection(Object[] items) {
+        if (items == null) {
+            setTriageGraphs(new LineageGraphResult[0]);
+        } else {
+            LineageGraphResult[] graphs = new LineageGraphResult[items.length];
+            for (int index = 0; index < items.length; index++) {
+                graphs[index] = (LineageGraphResult)items[index];
+            }
+            setTriageGraphs(graphs);
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        if (triageGraphs != null) {
+            for (LineageGraphResult graph : triageGraphs) {
+                buffer.append(graph.getDotNotation());
+                buffer.append("\n\n");
+            }
+        }
+        return buffer.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 233e4a6..aeab8f8 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -272,6 +272,14 @@ This can be used with instance management options. Default values are
replicatio
 Usage:
 $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>>
-status -lifecycle <<lifecycletype>> -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
 
+---+++Triage
+
+Given a feed/process instance this command traces it's ancestors to find what all ancestors
have failed. It's useful if
+lot of instances are failing in a pipeline as it then finds out the root cause of the pipeline
being stuck.
+
+Usage:
+$FALCON_HOME/bin/falcon instance -triage -type <<feed/process>> -name <<name>>
-start "yyyy-MM-dd'T'HH:mm'Z'"
+
 ---+++Params
 
 Displays the workflow params of a given instance. Where start time is considered as nominal
time of that instance and end time won't be considered.

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index 49dddb7..0094c39 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -66,6 +66,7 @@ See also: [[../Security.twiki][Security in Falcon]]
 | POST        | [[InstanceResume][api/instance/resume/:entity-type/:entity-name]]       
   | Resume a given instance      |
 | POST        | [[InstanceRerun][api/instance/rerun/:entity-type/:entity-name]]         
   | Rerun a given instance       |
 | GET         | [[InstanceLogs][api/instance/logs/:entity-type/:entity-name]]           
   | Get logs of a given instance |
+| GET         | [[Triage][api/instance/triage/:entity-type/:entity-name]]               
   | Triage an instance to see it's stuck lineage |
 | GET         | [[InstanceSummary][api/instance/summary/:entity-type/:entity-name]]     
   | Return summary of instances for an entity |
 | GET         | [[InstanceDependency][api/instance/dependency/:entity-type/:entity-name]]
  | Return dependent instances for a given instance |
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/docs/src/site/twiki/restapi/Triage.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/Triage.twiki b/docs/src/site/twiki/restapi/Triage.twiki
new file mode 100644
index 0000000..5646902
--- /dev/null
+++ b/docs/src/site/twiki/restapi/Triage.twiki
@@ -0,0 +1,44 @@
+---++  GET api/instance/triage/:entity-type/:entity-name
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+Given a feed/process instance this command traces it's ancestors to find what all ancestors
have failed. It's useful if
+lot of instances are failing in a pipeline as it then finds out the root cause of the pipeline
being stuck.
+
+
+---++ Parameters
+   * :entity-type type of entity(feed/process).
+   * :entity-name name of the feed/process.
+   * :start instance time of the entity instance.
+   * :colo <optional param> name of the colo on which you want to triage
+
+---++ Results
+It returns a json graph
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/feed/my-feed?start=2015-03-02T00:00Z&colo=local
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "vertices": ["(FEED) my-feed (2015-03-02T00:00Z) [Unavailable]", "(PROCESS) producer-process
(2015-03-01T10:00Z) [TIMEDOUT]", "(FEED) input-feed-for-producer (2015-03-01T00:00Z) [Available]"],
+    "edges":
+    [
+        {
+         "from"  : "(PROCESS) producer-process (2015-03-01T10:00Z) [TIMEDOUT]",
+         "to"    : "(FEED) my-feed (2015-03-02T00:00Z) [Unavailable]",
+         "label" : "produces"
+        },
+        {
+         "from"  : "(FEED) input-feed-for-producer (2015-03-01T00:00Z) [Available]",
+         "to"    : "(PROCESS) producer-process (2015-03-01T10:00Z) [TIMEDOUT]",
+         "label" : "consumed by"
+        }
+    ]
+}
+</verbatim>

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 1e813d2..13e0c82 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -23,16 +23,21 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Pair;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.FeedInstanceStatus;
 import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.logging.LogProvider;
 import org.apache.falcon.resource.InstancesResult.Instance;
@@ -51,10 +56,13 @@ import java.util.Calendar;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Queue;
 import java.util.Set;
 
 /**
@@ -523,6 +531,172 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
     }
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    /**
+     * Triage method returns the graph of the ancestors in "UNSUCCESSFUL" state.
+     *
+     * It will traverse all the ancestor feed instances and process instances in the current
instance's lineage.
+     * It stops traversing a lineage line once it encounters a "SUCCESSFUL" instance as this
feature is intended
+     * to find the root cause of a pipeline failure.
+     *
+     * @param entityType type of the entity. Only feed and process are valid entity types
for triage.
+     * @param entityName name of the entity.
+     * @param instanceTime time of the instance which should be used to triage.
+     * @return Returns a list of ancestor entity instances which have failed.
+     */
+    public TriageResult triageInstance(String entityType, String entityName, String instanceTime,
String colo) {
+
+        checkColo(colo);
+        checkType(entityType); // should be only process/feed
+        checkName(entityName);
+        try {
+            EntityType type = EntityType.valueOf(entityType.toUpperCase());
+            Entity entity = ConfigurationStore.get().get(type, entityName);
+            TriageResult result = new TriageResult(APIResult.Status.SUCCEEDED, "Success");
+            List<LineageGraphResult> triageGraphs = new LinkedList<>();
+            for (String clusterName : DeploymentUtil.getCurrentClusters()) {
+                Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
+                triageGraphs.add(triage(type, entity, instanceTime, cluster));
+            }
+            LineageGraphResult[] triageGraphsArray = new LineageGraphResult[triageGraphs.size()];
+            result.setTriageGraphs(triageGraphs.toArray(triageGraphsArray));
+            return result;
+        } catch (IllegalArgumentException e) { // bad entityType
+            LOG.error("Bad Entity Type: {}", entityType);
+            throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
+        } catch (EntityNotRegisteredException e) { // bad entityName
+            LOG.error("Bad Entity Name : {}", entityName);
+            throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
+        } catch (Throwable e) {
+            LOG.error("Failed to triage", e);
+            throw FalconWebException.newInstanceException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private void checkName(String entityName) {
+        if (StringUtils.isBlank(entityName)) {
+            throw FalconWebException.newInstanceException("Instance name is mandatory and
shouldn't be blank",
+                    Response.Status.BAD_REQUEST);
+        }
+    }
+
+    private LineageGraphResult triage(EntityType entityType, Entity entity, String instanceTime,
Cluster cluster)
+        throws FalconException {
+
+        Date instanceDate = SchemaHelper.parseDateUTC(instanceTime);
+        LineageGraphResult result = new LineageGraphResult();
+        Set<String> vertices = new HashSet<>();
+        Set<LineageGraphResult.Edge> edges = new HashSet<>();
+        Map<String, String> instanceStatusMap = new HashMap<>();
+
+        // queue containing all instances which need to be triaged
+        Queue<SchedulableEntityInstance> remainingInstances = new LinkedList<>();
+        SchedulableEntityInstance currentInstance = new SchedulableEntityInstance(entity.getName(),
cluster.getName(),
+                instanceDate, entityType);
+        remainingInstances.add(currentInstance);
+
+        while (!remainingInstances.isEmpty()) {
+            currentInstance = remainingInstances.remove();
+            if (currentInstance.getEntityType() == EntityType.FEED) {
+                Feed feed = ConfigurationStore.get().get(EntityType.FEED, currentInstance.getEntityName());
+                FeedInstanceStatus.AvailabilityStatus status = getFeedInstanceStatus(feed,
+                        currentInstance.getInstanceTime(), cluster);
+
+                // add vertex to the graph
+                vertices.add(currentInstance.toString());
+                instanceStatusMap.put(currentInstance.toString(), "[" + status.name() + "]");
+                if (status == FeedInstanceStatus.AvailabilityStatus.AVAILABLE) {
+                    continue;
+                }
+
+                // find producer process instance and add it to the queue
+                SchedulableEntityInstance producerInstance = FeedHelper.getProducerInstance(feed,
+                        currentInstance.getInstanceTime(), cluster);
+                if (producerInstance != null) {
+                    remainingInstances.add(producerInstance);
+
+                    //add edge from producerProcessInstance to the feedInstance
+                    LineageGraphResult.Edge edge = new LineageGraphResult.Edge(producerInstance.toString(),
+                            currentInstance.toString(), "produces");
+                    edges.add(edge);
+                }
+            } else { // entity type is PROCESS
+                Process process = ConfigurationStore.get().get(EntityType.PROCESS, currentInstance.getEntityName());
+                InstancesResult.WorkflowStatus status = getProcessInstanceStatus(process,
+                        currentInstance.getInstanceTime());
+
+                // add current process instance as a vertex
+                vertices.add(currentInstance.toString());
+                if (status == null) {
+                    instanceStatusMap.put(currentInstance.toString(), "[ Not Available ]");
+                } else {
+                    instanceStatusMap.put(currentInstance.toString(), "[" + status.name()
+ "]");
+                    if (status == InstancesResult.WorkflowStatus.SUCCEEDED) {
+                        continue;
+                    }
+                }
+
+                // find list of input feed instances - only mandatory ones and not optional
ones
+                Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process,
+                        currentInstance.getInstanceTime(), cluster, false);
+                for (SchedulableEntityInstance inputFeedInstance : inputFeedInstances) {
+                    remainingInstances.add(inputFeedInstance);
+
+                    //Add edge from inputFeedInstance to consumer processInstance
+                    LineageGraphResult.Edge edge = new LineageGraphResult.Edge(inputFeedInstance.toString(),
+                            currentInstance.toString(), "consumed by");
+                    edges.add(edge);
+                }
+            }
+        }
+
+        // append status to each vertex
+        Set<String> relabeledVertices = new HashSet<>();
+        for (String instance : vertices) {
+            String status = instanceStatusMap.get(instance);
+            relabeledVertices.add(instance + status);
+        }
+
+        // append status to each edge
+        for (LineageGraphResult.Edge edge : edges) {
+            String oldTo = edge.getTo();
+            String oldFrom = edge.getFrom();
+
+            String newFrom = oldFrom + instanceStatusMap.get(oldFrom);
+            String newTo = oldTo + instanceStatusMap.get(oldTo);
+
+            edge.setFrom(newFrom);
+            edge.setTo(newTo);
+        }
+
+        result.setEdges(edges.toArray(new LineageGraphResult.Edge[0]));
+        result.setVertices(relabeledVertices.toArray(new String[0]));
+        return result;
+    }
+
+    private FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, Date instanceTime,
Cluster cluster)
+        throws FalconException {
+        Storage storage = FeedHelper.createStorage(cluster, feed);
+        Date endRange = new Date(instanceTime.getTime() + 200);
+        List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster.getName(),
LocationType.DATA,
+                instanceTime, endRange);
+        return feedListing.get(0).getStatus();
+    }
+
+    private InstancesResult.WorkflowStatus getProcessInstanceStatus(Process process, Date
instanceTime)
+        throws FalconException {
+        AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+        List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>();
+        lifeCycles.add(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+        Date endRange = new Date(instanceTime.getTime() + 200);
+        Instance[] response = wfEngine.getStatus(process, instanceTime, endRange, lifeCycles).getInstances();
+        if (response.length > 0) {
+            return response[0].getStatus();
+        }
+        LOG.warn("No instances were found for the given process: {} & instanceTime: {}",
process, instanceTime);
+        return null;
+    }
+
+
     public InstancesResult reRunInstance(String type, String entity, String startStr,
                                          String endStr, HttpServletRequest request,
                                          String colo, List<LifeCycle> lifeCycles, Boolean
isForced) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 757fda8..0d59c22 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -30,6 +30,7 @@ import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.TriageResult;
 import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
 import org.slf4j.Logger;
@@ -375,6 +376,25 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
         }.execute(colo, entityType, entityName);
     }
 
+    @GET
+    @Path("triage/{type}/{name}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "triage-instance")
+    @Override
+    public TriageResult triageInstance(
+            @Dimension("type") @PathParam("type") final String entityType,
+            @Dimension("name") @PathParam("name") final String entityName,
+            @Dimension("instanceTime") @QueryParam("start") final String instanceTime,
+            @Dimension("colo") @QueryParam("colo") String colo) {
+        return new InstanceProxy<TriageResult>(TriageResult.class) {
+            @Override
+            protected TriageResult doExecute(String colo) throws FalconException {
+                return getInstanceManager(colo).invoke("triageInstance", entityType, entityName,
instanceTime, colo);
+            }
+        }.execute(colo, entityType, entityName);
+    }
+
+
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private abstract class InstanceProxy<T extends APIResult> {

http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index c2ac5b2..7249ba4 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -218,6 +218,19 @@ public class InstanceManager extends AbstractInstanceManager {
         return super.resumeInstance(request, type, entity, startStr, endStr, colo, lifeCycles);
     }
 
+    @GET
+    @Path("triage/{type}/{name}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "triage-instance")
+    @Override
+    public TriageResult triageInstance(
+            @Dimension("type") @PathParam("type") String entityType,
+            @Dimension("name") @PathParam("name") String entityName,
+            @Dimension("instanceTime") @QueryParam("start") String instanceTime,
+            @Dimension("colo") @QueryParam("colo") String colo) {
+        return super.triageInstance(entityType, entityName, instanceTime, colo);
+    }
+
     @POST
     @Path("rerun/{type}/{entity}")
     @Produces(MediaType.APPLICATION_JSON)


Mime
View raw message