falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [3/8] git commit: FALCON-24 Allow re-run of timedout instances. Contributed by Samarth Gupta
Date Thu, 03 Oct 2013 18:52:59 GMT
FALCON-24 Allow re-run of timedout instances. Contributed by Samarth Gupta


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/f6d8188b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/f6d8188b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/f6d8188b

Branch: refs/heads/FALCON-85
Commit: f6d8188b7d48e1954a075c92997bde6029843e7d
Parents: e86e192
Author: Shwetha GS <shwethags@gmail.com>
Authored: Tue Oct 1 18:09:51 2013 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Tue Oct 1 18:09:51 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../workflow/engine/OozieWorkflowEngine.java    | 131 ++++++++++++-------
 2 files changed, 88 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/f6d8188b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f6c5278..8226791 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -40,6 +40,9 @@ Trunk (Unreleased)
     Srikanth Sundarrajan)
 
   BUG FIXES
+    FALCON-24 Allow re-run of timedout instances. (Samarth Gupta via 
+    Shwetha GS)
+
     FALCON-128 Feed replication post processing log mover error. (Shwetha GS)
 
     FALCON-127 Fix test issues after FALCON-107. (Shwetha GS via Srikanth

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/f6d8188b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index d9d4124..8f6d466 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -37,6 +37,7 @@ import org.apache.log4j.Logger;
 import org.apache.oozie.client.*;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.rest.RestConstants;
 
 import java.util.*;
 import java.util.Map.Entry;
@@ -60,6 +61,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             .asList(WorkflowJob.Status.SUSPENDED);
     private static final List<WorkflowJob.Status> WF_RERUN_PRECOND = Arrays.asList(WorkflowJob.Status.FAILED,
             WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
+    private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND = Arrays
+            .asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
 
     private static final List<Job.Status> BUNDLE_ACTIVE_STATUS = Arrays.asList(
             Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED,
@@ -516,31 +519,21 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                         continue;
                     }
                 }
-                String status = mapActionStatus(coordinatorAction.getStatus());
-                WorkflowJob jobInfo = null;
-                if (coordinatorAction.getExternalId() != null) {
-                    jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
-                }
+                String status;
                 instanceCount++;
-                if (jobInfo != null) {
-                    status = mapWorkflowStatus(jobInfo.getStatus());
-                    try {
-                        status = performAction(action, props, cluster, status, jobInfo);
-                    } catch (FalconException e) {
-                        LOG.warn("Unable to perform action " + action + " on cluster ", e);
-                        status = WorkflowStatus.ERROR.name();
-                        overallStatus = APIResult.Status.PARTIAL;
-                    }
-                }
-                if (action != OozieWorkflowEngine.JobAction.STATUS
-                        && coordinatorAction.getExternalId() != null) {
-                    jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
+                try {
+                    status = performAction(cluster, action, coordinatorAction, props);
+                } catch (FalconException e) {
+                    LOG.warn("Unable to perform action " + action + " on cluster ", e);
+                    status = WorkflowStatus.ERROR.name();
+                    overallStatus = APIResult.Status.PARTIAL;
                 }
 
                 String nominalTimeStr = SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime());
                 InstancesResult.Instance instance = new InstancesResult.Instance(
                         cluster, nominalTimeStr, WorkflowStatus.valueOf(status));
-                if (jobInfo != null) {
+                if (coordinatorAction.getExternalId() != null) {
+                    WorkflowJob jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
                     instance.startTime = jobInfo.getStartTime();
                     instance.endTime = jobInfo.getEndTime();
                     instance.logFile = jobInfo.getConsoleUrl();
@@ -558,13 +551,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return instancesResult;
     }
 
-    private String performAction(JobAction action, Properties props,
-                                 String cluster, String status, WorkflowJob jobInfo)
-        throws FalconException {
-
+    private String performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction,
+                                 Properties props) throws FalconException {
+        WorkflowJob jobInfo = null;
+        String status = coordinatorAction.getStatus().name();
+        if (coordinatorAction.getExternalId() != null) {
+            jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
+            status = jobInfo.getStatus().name();
+        }
         switch (action) {
         case KILL:
-            if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
+            if (jobInfo == null || !WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
                 break;
             }
 
@@ -573,7 +570,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             break;
 
         case SUSPEND:
-            if (!WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) {
+            if (jobInfo == null || !WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) {
                 break;
             }
 
@@ -582,7 +579,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             break;
 
         case RESUME:
-            if (!WF_RESUME_PRECOND.contains(jobInfo.getStatus())) {
+            if (jobInfo == null || !WF_RESUME_PRECOND.contains(jobInfo.getStatus())) {
                 break;
             }
 
@@ -591,19 +588,66 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             break;
 
         case RERUN:
-            if (!WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
-                break;
+            if (jobInfo == null && COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus()))
{
+                //Coord action re-run
+                reRunCoordAction(cluster, coordinatorAction);
+                status = Status.RUNNING.name();
+            } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus()))
{
+                //wf re-run
+                reRun(cluster, jobInfo.getId(), props);
+                status = Status.RUNNING.name();
             }
-
-            reRun(cluster, jobInfo.getId(), props);
-            status = Status.RUNNING.name();
             break;
 
+
         case STATUS:
             break;
+
         default:
+            throw new IllegalArgumentException("Unhandled action " + action);
         }
-        return status;
+
+        return mapActionStatus(status);
+    }
+
+    private void reRunCoordAction(String cluster, CoordinatorAction coordinatorAction) throws
FalconException  {
+        try{
+            OozieClient client = OozieClientFactory.get(cluster);
+            client.reRunCoord(coordinatorAction.getJobId(),
+                RestConstants.JOB_COORD_RERUN_ACTION,
+                    Integer.toString(coordinatorAction.getActionNumber()), true, true);
+            assertCoordActionStatus(cluster, coordinatorAction.getId(),
+                org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+                    org.apache.oozie.client.CoordinatorAction.Status.WAITING,
+                org.apache.oozie.client.CoordinatorAction.Status.READY);
+            LOG.info("Rerun job " + coordinatorAction.getId() + " on cluster " + cluster);
+        }catch (Exception e) {
+            LOG.error("Unable to rerun workflows", e);
+            throw new FalconException(e);
+        }
+    }
+
+    private void assertCoordActionStatus(String cluster, String coordActionId,
+            org.apache.oozie.client.CoordinatorAction.Status... statuses)
+        throws FalconException, OozieClientException {
+        OozieClient client = OozieClientFactory.get(cluster);
+        CoordinatorAction actualStatus = client.getCoordActionInfo(coordActionId);
+        for (int counter = 0; counter < 3; counter++) {
+            for(org.apache.oozie.client.CoordinatorAction.Status status : statuses) {
+                if (status.equals(actualStatus.getStatus())) {
+                    return;
+                }
+            }
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            actualStatus = client.getCoordActionInfo(coordActionId);
+        }
+        throw new FalconException("For Job" + coordActionId + ", actual statuses: "
+            +actualStatus + ", expected statuses: "
+                 + Arrays.toString(statuses));
     }
 
     private String getSourceCluster(String cluster,
@@ -633,24 +677,19 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return clusterList;
     }
 
-    private String mapActionStatus(CoordinatorAction.Status status) {
-        if (status == CoordinatorAction.Status.READY
-                || status == CoordinatorAction.Status.WAITING
-                || status == CoordinatorAction.Status.TIMEDOUT
-                || status == CoordinatorAction.Status.SUBMITTED) {
+    private String mapActionStatus(String status) {
+        if (CoordinatorAction.Status.READY.toString().equals(status)
+                || CoordinatorAction.Status.WAITING.toString().equals(status)
+                || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.WAITING.name();
-        } else if (status == CoordinatorAction.Status.DISCARDED) {
+        } else if (CoordinatorAction.Status.DISCARDED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.KILLED.name();
-        } else {
-            return status.name();
-        }
-    }
-
-    private String mapWorkflowStatus(WorkflowJob.Status status) {
-        if (status == WorkflowJob.Status.PREP) {
+        } else if (CoordinatorAction.Status.TIMEDOUT.toString().equals(status)) {
+            return InstancesResult.WorkflowStatus.FAILED.name();
+        } else if (WorkflowJob.Status.PREP.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.RUNNING.name();
         } else {
-            return status.name();
+            return status;
         }
     }
 


Mime
View raw message