falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suh...@apache.org
Subject git commit: FALCON-813. Expose job id for running jobs in Falcon. Contributed by Suhas Vasu
Date Thu, 06 Nov 2014 09:29:20 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 86e0ccfa0 -> 8cbd38551


FALCON-813. Expose job id for running jobs in Falcon. Contributed by Suhas Vasu


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/8cbd3855
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/8cbd3855
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/8cbd3855

Branch: refs/heads/master
Commit: 8cbd3855152edb809015d6c71d7dd3ece5d95582
Parents: 86e0ccf
Author: Suhas V <suhas.v@inmobi.com>
Authored: Thu Nov 6 14:59:04 2014 +0530
Committer: Suhas V <suhas.v@inmobi.com>
Committed: Thu Nov 6 14:59:04 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/falcon/client/FalconClient.java  |  7 +++
 docs/src/site/twiki/FalconCLI.twiki             |  3 +-
 .../src/site/twiki/restapi/InstanceStatus.twiki | 23 ++++++---
 .../workflow/engine/OozieWorkflowEngine.java    | 54 +++++++++++++++++---
 5 files changed, 71 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cc4a35..1321734 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -37,6 +37,7 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-813 Expose job id for running jobs in Falcon (Suhas Vasu)
    FALCON-834 Propagate request id in the response to help trace and debug
    failures in merlin (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 6aeb59b..7ac2981 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -961,6 +961,13 @@ public class FalconClient {
                     sb.append("\n");
                 }
 
+                if (instance.actions != null) {
+                    sb.append("actions:\n");
+                    for (InstancesResult.InstanceAction action : instance.actions) {
+                        sb.append(" ").append(action.getAction()).append("\t");
+                        sb.append(action.getStatus()).append("\t").append(action.getLogFile()).append("\n");
+                    }
+                }
             }
         }
         sb.append("\nAdditional Information:\n");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 6470c0a..7bdac5d 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -149,7 +149,8 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name
<<name>> -resume -
 ---+++Status
 
 Status option via CLI can be used to get the status of a single or multiple instances.  If
the instance is not yet materialized but is within the process validity range, WAITING is
returned as the state. Along with the status of the instance time is also returned. Log location
gives the oozie workflow url
-If the instance is in WAITING state, missing dependencies are listed
+If the instance is in WAITING state, missing dependencies are listed.
+The job urls are populated for all actions of user workflow and non-succeeded actions of
the main-workflow. The user then need not go to the underlying scheduler to get the job urls
when needed to debug an issue in the job.
 
 Example : Suppose a process has 3 instance, one has succeeded,one is in running state and
other one is waiting, the expected output is:
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/docs/src/site/twiki/restapi/InstanceStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki b/docs/src/site/twiki/restapi/InstanceStatus.twiki
index ece8c3f..cebc9c8 100644
--- a/docs/src/site/twiki/restapi/InstanceStatus.twiki
+++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki
@@ -27,12 +27,12 @@ Get status of a specific instance of an entity.
 
    
 ---++ Results
-Status of the specified instance.
+Status of the specified instance along with job urls for all actions of user workflow and
non-succeeded actions of the main-workflow.
 
 ---++ Examples
 ---+++ Rest Call
 <verbatim>
-GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
+GET https://localhost:15443/api/instance/status/process/WordCount?start=2014-11-04T16:00Z&colo=*
 </verbatim>
 ---+++ Result
 <verbatim>
@@ -40,15 +40,22 @@ GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&star
     "instances": [
         {
             "details": "",
-            "endTime": "2013-10-21T14:40:26-07:00",
-            "startTime": "2013-10-21T14:39:56-07:00",
-            "cluster": "primary-cluster",
-            "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+            "endTime": "2014-11-05T16:08:10+05:30",
+            "startTime": "2014-11-05T16:07:29+05:30",
+            "cluster": "local",
+            "logFile": "http:\/\/localhost:11000\/oozie?job=0000011-141105155430303-oozie-oozi-W",
             "status": "SUCCEEDED",
-            "instance": "2012-04-03T07:00Z"
+            "instance": "2014-11-04T16:00Z",
+            "actions": [
+                {
+                    "action": "wordcount-mr",
+                    "status": "SUCCEEDED",
+                    "logFile": "http:\/\/localhost:50030\/jobdetails.jsp?jobid=job_201411051553_0005"
+                }
+            ]
         }
     ],
-    "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n",
+    "requestId": "default\/b9fc3cba-1b46-4d1f-8196-52c795ea3580\n",
     "message": "default\/STATUS\n",
     "status": "SUCCEEDED"
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7032182..89bebe7 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -51,16 +51,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.*;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.client.Job;
 import org.apache.oozie.client.Job.Status;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
-import org.apache.oozie.client.ProxyOozieClient;
-import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.rest.RestConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,6 +107,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
     private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
 
+    private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList(
+            "pre-processing",
+            "should-record",
+            "succeeded-post-processing",
+            "failed-post-processing"
+    );
+
     private static final String[] BUNDLE_UPDATEABLE_PROPS =
         new String[]{"parallel", "clusters.clusters[\\d+].validity.end", };
 
@@ -591,6 +591,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                     if (action == JobAction.PARAMS) {
                         instance.wfParams = getWFParams(jobInfo);
                     }
+                    if (action == JobAction.STATUS) {
+                        populateInstanceActions(cluster, jobInfo, instance);
+                    }
                 }
                 instance.details = coordinatorAction.getMissingDependencies();
                 instances.add(instance);
@@ -679,6 +682,41 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return instancesSummaryResult;
     }
 
+    private void populateInstanceActions(String cluster, WorkflowJob wfJob, Instance instance)
+        throws FalconException {
+
+        List<InstancesResult.InstanceAction> instanceActions = new ArrayList<InstancesResult.InstanceAction>();
+
+        List<WorkflowAction> wfActions = wfJob.getActions();
+        for (WorkflowAction action : wfActions) {
+            if (action.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(action.getExternalId()))
{
+                List<WorkflowAction> subWorkFlowActions = getWorkflowInfo(cluster,
action.getExternalId()).getActions();
+                for (WorkflowAction subWfAction : subWorkFlowActions) {
+                    if (!subWfAction.getType().startsWith(":")) {
+                        InstancesResult.InstanceAction instanceAction =
+                                new InstancesResult.InstanceAction(subWfAction.getName(),
+                                        subWfAction.getExternalStatus(), subWfAction.getConsoleUrl());
+                        instanceActions.add(instanceAction);
+                    }
+                }
+            } else if (!action.getType().startsWith(":")) {
+                if (PARENT_WF_ACTION_NAMES.contains(action.getName())
+                        && !Status.SUCCEEDED.toString().equals(action.getExternalStatus()))
{
+                    InstancesResult.InstanceAction instanceAction =
+                            new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+                                    action.getConsoleUrl());
+                    instanceActions.add(instanceAction);
+                } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName())) {
+                    InstancesResult.InstanceAction instanceAction =
+                            new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+                                    action.getConsoleUrl());
+                    instanceActions.add(instanceAction);
+                }
+            }
+        }
+        instance.actions = instanceActions.toArray(new InstancesResult.InstanceAction[instanceActions.size()]);
+    }
+
     private Map<String, String> getWFParams(WorkflowJob jobInfo) {
         Map<String, String> wfParams = new HashMap<String, String>();
         Configuration conf = new Configuration(false);


Mime
View raw message