falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [1/5] falcon git commit: FALCON-1723 Rerun with skip fail actions won't work in few cases (By Pavan Kolamuri)
Date Tue, 19 Jan 2016 17:14:39 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.9 8dee7c9ea -> cc41b2070


FALCON-1723 Rerun with skip fail actions won't work in few cases (By Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/a4fd2c1f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/a4fd2c1f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/a4fd2c1f

Branch: refs/heads/0.9
Commit: a4fd2c1fae4397fd1ef57660ce060f94654ea43c
Parents: 14e209d
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Jan 19 21:45:55 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Jan 19 21:45:55 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 docs/src/site/twiki/FalconCLI.twiki             |  2 +-
 .../workflow/engine/OozieWorkflowEngine.java    | 51 +++++++++++++++-----
 .../engine/OozieWorkflowEngine.java.rej         | 32 ++++++++++++
 .../workflow/engine/FalconWorkflowEngine.java   |  3 ++
 5 files changed, 76 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a200b9c..1916c9a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via
Pallavi Rao)
+
     FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1715 IllegalStateException in MetadataMappingService when entity is scheduled
via native scheduler (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 9f91143..8beb473 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -238,7 +238,7 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name
<<name>> -continue
 
 Rerun option is used to rerun instances of a given process. On issuing a rerun, by default
the execution resumes from the last failed node in the workflow. This option is valid only
for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED.
 If one wants to forcefully rerun the entire workflow, -force should be passed along with
-rerun
-Additionally, you can also specify properties to override via a properties file.
+Additionally, you can also specify properties to override via a properties file and this
will be prioritized over force option in case of contradiction.
 
 Usage:
 $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>>
-rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-force] [-file <<properties
file>>]

http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index e3f0831..0b5346b 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -559,10 +559,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     public InstancesResult reRunInstances(Entity entity, Date start, Date end,
                                           Properties props, List<LifeCycle> lifeCycles,
                                           Boolean isForced) throws FalconException {
-        if (isForced != null && isForced) {
-            props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+        if (isForced == null) {
+            isForced = false;
         }
-        return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles);
+        return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced);
     }
 
     @Override
@@ -628,8 +628,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+
     private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date
end,
-                                        Properties props, List<LifeCycle> lifeCycles)
throws FalconException {
+                                        Properties props, List<LifeCycle> lifeCycles,
+                                        boolean isForced) throws FalconException {
         Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity,
start, end, lifeCycles);
         List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
         List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS);
@@ -659,7 +662,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                         new InstancesResult.Instance(cluster, nominalTimeStr, null);
                 instance.sourceCluster = sourceCluster;
                 try {
-                    performAction(cluster, action, coordinatorAction, props, instance);
+                    performAction(cluster, action, coordinatorAction, props, instance, isForced);
                 } catch (FalconException e) {
                     LOG.warn("Unable to perform action {} on cluster", action, e);
                     instance.status = WorkflowStatus.ERROR;
@@ -678,6 +681,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return instancesResult;
     }
 
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date
end, Properties props,
+                                        List<LifeCycle> lifeCycles) throws FalconException
{
+        return doJobAction(action, entity, start, end, props, lifeCycles, false);
+    }
+
     private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start,
                                                       Date end, Properties props,
                                                       List<LifeCycle> lifeCycles) throws
FalconException {
@@ -821,7 +831,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private void performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction,
-        Properties props, InstancesResult.Instance instance) throws FalconException {
+        Properties props, InstancesResult.Instance instance, boolean isForced) throws FalconException
{
         WorkflowJob jobInfo = null;
         String status = coordinatorAction.getStatus().name();
         if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
@@ -867,7 +877,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 status = Status.RUNNING.name();
             } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus()))
{
                 //wf re-run
-                reRun(cluster, jobInfo.getId(), props, false);
+                reRun(cluster, jobInfo.getId(), props, isForced);
                 status = Status.RUNNING.name();
             }
             break;
@@ -1423,17 +1433,31 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         OozieClient client = OozieClientFactory.get(cluster);
         try {
             WorkflowJob jobInfo = client.getJobInfo(jobId);
-            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
-            if (props != null) {
-                jobprops.putAll(props);
+            if (props == null) {
+                props = new Properties();
             }
+
             //if user has set any of these oozie rerun properties then force rerun flag is
ignored
-            if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
-                    && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
-                jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+            if (!props.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
             }
+
+            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
+            jobprops.putAll(props);
+
             jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
             jobprops.remove(OozieClient.BUNDLE_APP_PATH);
+
+            // In case if both props exists one should be removed otherwise it will fail.
+            // This case will occur when user runs workflow with skip-nodes property and
+            // try to do force rerun or rerun with fail-nodes property.
+            if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES
+                        + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES);
+                jobprops.remove(OozieClient.RERUN_SKIP_NODES);
+            }
             client.reRun(jobId, jobprops);
             assertStatus(cluster, jobId, Job.Status.RUNNING);
             LOG.info("Rerun job {} on cluster {}", jobId, cluster);
@@ -1443,6 +1467,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+
     private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException
{
 
         String actualStatus = null;

http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej
new file mode 100644
index 0000000..4c08ae3
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej
@@ -0,0 +1,32 @@
+diff a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
(rejected hunks)
+@@ -637,8 +637,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
+         return doJobAction(action, entity, start, end, props, lifeCycles, null);
+     }
+ 
+-    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date
end, Properties props,
+-                                        List<LifeCycle> lifeCycles, Boolean allAttempts)
throws FalconException {
++    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
++    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date
end,
++                                        Properties props, List<LifeCycle> lifeCycles,
++                                        Boolean allAttempts, boolean isForced) throws FalconException
{
+         Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity,
start, end, lifeCycles);
+         List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
+         List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS);
+@@ -669,7 +671,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
+                 instance.sourceCluster = sourceCluster;
+                 if (action.equals(JobAction.STATUS) && Boolean.TRUE.equals(allAttempts))
{
+                     try {
+-                        performAction(cluster, action, coordinatorAction, props, instance);
++                        performAction(cluster, action, coordinatorAction, props, instance,
isForced);
+                         if (instance.getRunId() > 0) {
+                             instanceList = getAllInstances(cluster, coordinatorAction, nominalTimeStr);
+                         } else {
+@@ -687,7 +689,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
+                     }
+                 } else {
+                     try {
+-                        performAction(cluster, action, coordinatorAction, props, instance);
++                        performAction(cluster, action, coordinatorAction, props, instance,
isForced);
+                     } catch (FalconException e) {
+                         LOG.warn("Unable to perform action {} on cluster", action, e);
+                         instance.status = WorkflowStatus.ERROR;

http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index b7379d4..eb39ec0 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -371,6 +371,9 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties
props,
                                           List<LifeCycle> lifeCycles, Boolean isForced)
throws FalconException {
+        if (isForced == null) {
+            isForced = false;
+        }
         return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced);
     }
 


Mime
View raw message