falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2113 Falcon retry happens in few cases inspite of a manual kill from the user. move it to a different state(ignore) and check for that state before retrying to stop it from retrying.
Date Wed, 24 Aug 2016 07:38:09 GMT
Repository: falcon
Updated Branches:
  refs/heads/master fc27ebb84 -> 470f34313


FALCON-2113 Falcon retry happens in few cases inspite of a manual kill from the user. move
it to a different state(ignore) and check for that state before retrying to stop it from retrying.

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao, @PraveenAdlakha

Closes #265 from sandeepSamudrala/FALCON-2113 and squashes the following commits:

2a00bef [sandeep] fixing failures
285c796 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2113
4f585a1 [sandeep] Incorporated review comments. Removed parent Id from passing into method
to check for manual kill
2d0d9a3 [sandeep] FALCON-2113. Falcon retry happens in few cases inspite of a manual kill
from the user.  move it to a different state(ignore) and check for that state before retrying
to stop it from retrying.
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time
with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/470f3431
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/470f3431
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/470f3431

Branch: refs/heads/master
Commit: 470f34313b0bf8210f95f6969712b12e55a9e169
Parents: fc27ebb
Author: sandeep <sandysmdl@gmail.com>
Authored: Wed Aug 24 13:08:04 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Wed Aug 24 13:08:04 2016 +0530

----------------------------------------------------------------------
 .../apache/falcon/resource/InstancesResult.java |  2 +-
 .../workflow/engine/AbstractWorkflowEngine.java |  3 ++
 .../workflow/engine/OozieWorkflowEngine.java    | 37 ++++++++++++++++++--
 .../resource/AbstractInstanceManager.java       |  4 ++-
 .../workflow/engine/FalconWorkflowEngine.java   |  6 ++++
 5 files changed, 47 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index f8de645..3dc74c4 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -34,7 +34,7 @@ public class InstancesResult extends APIResult {
      * Workflow status as being set in result object.
      */
     public static enum WorkflowStatus {
-        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED,
READY
+        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED,
READY, KILLED_OR_IGNORED
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 0db7e9b..16a1753 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -80,6 +80,9 @@ public abstract class AbstractWorkflowEngine {
     public abstract InstancesResult killInstances(Entity entity, Date start, Date end, Properties
props,
                                                   List<LifeCycle> lifeCycles) throws
FalconException;
 
+    public abstract InstancesResult ignoreInstances(Entity entity, Date start, Date end,
Properties props,
+                                                    List<LifeCycle> lifeCycles) throws
FalconException;
+
     public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties
props,
                                                    List<LifeCycle> lifeCycles, Boolean
isForced) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 06d0142..c371d69 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -588,6 +588,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
+    public InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties
props,
+                                              List<LifeCycle> lifeCycles) throws FalconException
{
+        return doJobAction(JobAction.IGNORE, entity, start, end, props, lifeCycles);
+    }
+
+    @Override
     public InstancesResult reRunInstances(Entity entity, Date start, Date end,
                                           Properties props, List<LifeCycle> lifeCycles,
                                           Boolean isForced) throws FalconException {
@@ -649,7 +655,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private static enum JobAction {
-        KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
+        KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS, IGNORE
     }
 
     private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException
{
@@ -953,6 +959,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             status = Status.KILLED.name();
             break;
 
+        case IGNORE:
+            if (!status.equals(Status.IGNORED.name())) {
+                ignore(cluster, coordinatorAction.getJobId(), coordinatorAction.getActionNumber());
+            }
+            status = mapActionStatus(Status.IGNORED.name());
+            break;
+
         case SUSPEND:
             if (jobInfo == null || !WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) {
                 break;
@@ -1100,8 +1113,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         } else if (CoordinatorAction.Status.WAITING.toString().equals(status)
             || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.WAITING.name();
-        } else if (CoordinatorAction.Status.IGNORED.toString().equals(status)) {
+        } else if (CoordinatorAction.Status.KILLED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.KILLED.name();
+        } else if (CoordinatorAction.Status.IGNORED.toString().equals(status)) {
+            return InstancesResult.WorkflowStatus.KILLED_OR_IGNORED.name();
         } else if (CoordinatorAction.Status.TIMEDOUT.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.FAILED.name();
         } else if (WorkflowJob.Status.PREP.toString().equals(status)) {
@@ -1678,6 +1693,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    private void ignore(String cluster, String jobId, int instanceNumber) throws FalconException
{
+        try {
+            OozieClientFactory.get(cluster).ignore(jobId, String.valueOf(instanceNumber));
+            assertStatus(cluster, jobId + "@" + instanceNumber,
+                    Status.IGNORED, Status.FAILED, Status.SUCCEEDED, Status.DONEWITHERROR);
+            LOG.info("Ignored job {} on cluster {}", jobId, cluster);
+        } catch (OozieClientException e) {
+            throw new FalconException(e);
+        }
+    }
+
     private void kill(String cluster, String jobId, String rangeType, String scope) throws
FalconException {
         try {
             OozieClientFactory.get(cluster).kill(jobId, rangeType, scope);
@@ -1791,11 +1817,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException
{
+        // In case of a kill being issued from falcon api, the state will be moved to IGNORE
         // In case of a failure, the Oozie action has an errorCode.
         // In case of no errorCode in any of the actions would mean its killed by user
         try {
-            // Check for error code in all the actions in main workflow
             OozieClient oozieClient = OozieClientFactory.get(cluster);
+            String parentId = oozieClient.getJobInfo(jobId).getParentId();
+            if (oozieClient.getCoordActionInfo(parentId).getStatus().equals(CoordinatorAction.Status.IGNORED))
{
+                return true;
+            }
+            // Check for error code in all the actions in main workflow
             List<WorkflowAction> wfActions = oozieClient.getJobInfo(jobId).getActions();
             for (WorkflowAction subWfAction : wfActions) {
                 if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 276e316..f86f097 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -580,7 +580,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
                     entityObject, startStr, endStr);
 
             AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
-            return wfEngine.killInstances(entityObject,
+            wfEngine.killInstances(entityObject,
+                    startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
+            return wfEngine.ignoreInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
         } catch (Throwable e) {
             LOG.error("Failed to kill instances", e);

http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index fe16443..9ba62a1 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -392,6 +392,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
+    public InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties
props,
+                                           List<LifeCycle> lifeCycles) throws FalconException{
+        throw new UnsupportedOperationException("Not yet Implemented");
+    }
+
+    @Override
     public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties
props,
                                           List<LifeCycle> lifeCycles, Boolean isForced)
throws FalconException {
         if (isForced == null) {


Mime
View raw message