falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1835 [IMPROVEMENT] Falcon should do coord rerun rather than workflow rerun
Date Wed, 24 Feb 2016 11:55:37 GMT
Repository: falcon
Updated Branches:
  refs/heads/master e270b5c70 -> a058cf2b4


FALCON-1835 [IMPROVEMENT] Falcon should do coord rerun rather than workflow rerun

…to ensure concurrency

Patch contains the following changes:
1. Upgrade to Oozie client 4.2 (from 4.1).
2. Remove workflow rerun completely and switch to coord rerun.
3. Introduced parentId in WorkflowExecutionContext in order to store the coord action of a given workflow. This will be used during retry and late data rerun.
4. In cases where parentId cannot be populated (notification from Post processing action will not contain this), rerun method queries Oozie to retrieve the parent id.
5. The rerun method now returns the actual status after rerun, rather than hardcoded “RUNNING”.
6. In 4.2, Oozie does a Yarn job kill to kill any lingering executions of previous run of a workflow. It uses RMClientProxy to do the same. Since in local mode, RPC cannot happen, had to introduce LocalFalconRPCClientFactory to prevent RPC in local mode.

Author: Pallavi Rao <pallavi.rao@inmobi.com>

Reviewers: Sandeep Samudrala <sandeep.samudrala@inmobi.com>, Srikanth Sundarrajan<srikath.sundarrajan@inmobi.com>, Pavan Kolamuri <pavan.kolamuri@inmobi.com>

Closes #48 from pallavi-rao/1835 and squashes the following commits:

0e0b317 [Pallavi Rao] FALCON-1835 Clearing old settings from config retrieved
0c0d2c4 [Pallavi Rao] FALCON-1835 Updated doc.
418ac16 [Pallavi Rao] FALCON-1835 Fixed checkstyle issue
96cd509 [Pallavi Rao] FALCON-1835 Updated rerun method to throw exception when both skip.nodes and failnodes are specified
8b0cb1f [Pallavi Rao] FALCON-1835 Updated rerun method to merge user props and job conf.
43df66d [Pallavi Rao] FALCON-1835 Falcon should do coord rerun rather than workflow rerun to ensure concurrency


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/a058cf2b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/a058cf2b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/a058cf2b

Branch: refs/heads/master
Commit: a058cf2b4481769b1eaf96c4a6a45e5d76c1556f
Parents: e270b5c
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Wed Feb 24 17:25:23 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Wed Feb 24 17:25:23 2016 +0530

----------------------------------------------------------------------
 .../falcon/workflow/WorkflowExecutionArgs.java  |   2 +-
 .../workflow/WorkflowExecutionContext.java      |   4 +
 .../WorkflowJobEndNotificationService.java      |   5 -
 .../workflow/engine/AbstractWorkflowEngine.java |   3 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   2 +
 .../falcon/messaging/JMSMessageConsumer.java    |   3 +
 .../workflow/engine/OozieWorkflowEngine.java    | 111 ++++-----
 .../oozie/client/LocalProxyOozieClient.java     |  11 +
 pom.xml                                         |   2 +-
 .../apache/falcon/rerun/event/LaterunEvent.java |   4 +-
 .../apache/falcon/rerun/event/RerunEvent.java   |   8 +-
 .../falcon/rerun/event/RerunEventFactory.java   |   4 +-
 .../apache/falcon/rerun/event/RetryEvent.java   |   4 +-
 .../rerun/handler/AbstractRerunHandler.java     |   2 +-
 .../falcon/rerun/handler/LateRerunConsumer.java |  12 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   6 +-
 .../falcon/rerun/handler/RetryConsumer.java     |   9 +-
 .../falcon/rerun/handler/RetryHandler.java      |   6 +-
 .../apache/falcon/rerun/queue/ActiveMQTest.java |   2 +-
 .../falcon/rerun/queue/InMemoryQueueTest.java   |   6 +-
 .../workflow/engine/FalconWorkflowEngine.java   |   3 +-
 .../java/org/apache/falcon/unit/FalconUnit.java |   3 +
 .../unit/LocalFalconRPCClientFactory.java       | 241 +++++++++++++++++++
 unit/src/main/resources/yarn-site.xml           |  30 +++
 24 files changed, 398 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 3363e1f..2171092 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -55,6 +55,7 @@ public enum WorkflowExecutionArgs {
     STATUS("status", "status of the user workflow isnstance"),
     WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie", false),
     USER_SUBFLOW_ID("subflowId", "external id of user workflow", false),
+    PARENT_ID("parentId", "The parent of the current workflow, typically coord action", false),
 
     WF_START_TIME("workflowStartTime", "workflow start time", false),
     WF_END_TIME("workflowEndTime", "workflow end time", false),
@@ -89,7 +90,6 @@ public enum WorkflowExecutionArgs {
     CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false),
     COUNTERS("counters", "store job counters", false);
 
-
     private final String name;
     private final String description;
     private final boolean isRequired;

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 5866369..9b1e1f4 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -295,6 +295,10 @@ public class WorkflowExecutionContext {
         return getValue(WorkflowExecutionArgs.WORKFLOW_ID);
     }
 
+    public String getWorkflowParentId() {
+        return getValue(WorkflowExecutionArgs.PARENT_ID);
+    }
+
     public String getUserSubflowId() {
         return getValue(WorkflowExecutionArgs.USER_SUBFLOW_ID);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index faea25c..b692258 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -139,11 +139,6 @@ public class WorkflowJobEndNotificationService implements FalconService {
 
     public void notifyWait(WorkflowExecutionContext context) throws FalconException {
         // Wait notifications can only be from Oozie JMS notifications
-
-        if (!updateContextFromWFConf(context)) {
-            return;
-        }
-
         LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index b899a58..4d8402a 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -61,7 +61,8 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract String delete(Entity entity, String cluster) throws FalconException;
 
-    public abstract void reRun(String cluster, String wfId, Properties props, boolean isForced) throws FalconException;
+    public abstract String reRun(String cluster, String wfId, Properties props, boolean isForced)
+        throws FalconException;
 
     public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index b3d80e2..d08c3a3 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -919,6 +919,8 @@ Examples:
 </verbatim>
 The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins.
 
+*NOTE :* If user does a manual rerun with -force option (using the instance rerun API), then the runId will get reset and user might see more Falcon system retries than configured in the process definition.
+
 To enable retries for instances for feeds, user will have to set the following properties in runtime.properties
 <verbatim>
 falcon.recipe.retry.policy=periodic

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index ccc2cfb..90bbdd3 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -147,6 +147,9 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
                 wfProperties.put(WorkflowExecutionArgs.NOMINAL_TIME,
                         getNominalTimeString(Long.parseLong(json.getString("nominalTime"))));
             }
+            if (!json.isNull("parentId")) {
+                wfProperties.put(WorkflowExecutionArgs.PARENT_ID, json.getString("parentId"));
+            }
             String appName = message.getStringProperty("appName");
             Pair<String, EntityType> entityTypePair = WorkflowNameBuilder.WorkflowName.getEntityNameAndType(appName);
             wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first);

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ebf23da..ab2dd5a 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -97,11 +97,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 WorkflowJob.Status.FAILED);
     private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING);
     private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED);
-    private static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
-        Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
     private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND =
-        Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
-
+        Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED,
+                CoordinatorAction.Status.KILLED, CoordinatorAction.Status.SUCCEEDED);
     private static final List<Job.Status> BUNDLE_ACTIVE_STATUS =
         Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED,
             Job.Status.RUNNINGWITHERROR, Job.Status.PAUSED, Status.PREPPAUSED, Status.PAUSEDWITHERROR);
@@ -937,18 +935,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             break;
 
         case RERUN:
-            if (jobInfo == null && COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) {
-                //Coord action re-run
-                reRunCoordAction(cluster, coordinatorAction);
-                status = Status.RUNNING.name();
-            } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
-                //wf re-run
-                reRun(cluster, jobInfo.getId(), props, isForced);
-                status = Status.RUNNING.name();
+            if (COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) {
+                status = reRunCoordAction(cluster, coordinatorAction, props, isForced).name();
             }
             break;
 
-
         case STATUS:
             if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
                 populateInstanceActions(cluster, jobInfo, instance);
@@ -974,30 +965,64 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private void reRunCoordAction(String cluster, CoordinatorAction coordinatorAction) throws FalconException {
+    public CoordinatorAction.Status reRunCoordAction(String cluster, CoordinatorAction coordinatorAction,
+                                                      Properties props, boolean isForced) throws FalconException {
         try {
             OozieClient client = OozieClientFactory.get(cluster);
+            if (props == null) {
+                props = new Properties();
+            }
+            // In case if both props exists, throw an exception.
+            // This case will occur when user runs workflow with skip-nodes property and
+            // try to do force rerun or rerun with fail-nodes property.
+            if (props.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                String msg = "Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES
+                        + " are present in workflow params for " + coordinatorAction.getExternalId();
+                LOG.error(msg);
+                throw new FalconException(msg);
+            }
+
+            //if user has set any of these oozie rerun properties then force rerun flag is ignored
+            if (props.containsKey(OozieClient.RERUN_FAIL_NODES)) {
+                isForced = false;
+            }
+            Properties jobprops;
+            // Get conf when workflow is launched.
+            if (coordinatorAction.getExternalId() != null) {
+                WorkflowJob jobInfo = client.getJobInfo(coordinatorAction.getExternalId());
+
+                jobprops = OozieUtils.toProperties(jobInfo.getConf());
+                // Clear the rerun properties from existing configuration
+                jobprops.remove(OozieClient.RERUN_FAIL_NODES);
+                jobprops.remove(OozieClient.RERUN_SKIP_NODES);
+                jobprops.putAll(props);
+                jobprops.remove(OozieClient.BUNDLE_APP_PATH);
+            } else {
+                jobprops = props;
+            }
+
             client.reRunCoord(coordinatorAction.getJobId(), RestConstants.JOB_COORD_SCOPE_ACTION,
-                Integer.toString(coordinatorAction.getActionNumber()), true, true);
-            assertCoordActionStatus(cluster, coordinatorAction.getId(),
-                org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
-                org.apache.oozie.client.CoordinatorAction.Status.WAITING,
-                org.apache.oozie.client.CoordinatorAction.Status.READY);
+                    Integer.toString(coordinatorAction.getActionNumber()), true, true, !isForced, jobprops);
             LOG.info("Rerun job {} on cluster {}", coordinatorAction.getId(), cluster);
+            return assertCoordActionStatus(cluster, coordinatorAction.getId(),
+                    org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+                    org.apache.oozie.client.CoordinatorAction.Status.WAITING,
+                    org.apache.oozie.client.CoordinatorAction.Status.READY);
         } catch (Exception e) {
             LOG.error("Unable to rerun workflows", e);
             throw new FalconException(e);
         }
     }
 
-    private void assertCoordActionStatus(String cluster, String coordActionId,
+    private CoordinatorAction.Status assertCoordActionStatus(String cluster, String coordActionId,
         org.apache.oozie.client.CoordinatorAction.Status... statuses) throws FalconException, OozieClientException {
         OozieClient client = OozieClientFactory.get(cluster);
         CoordinatorAction actualStatus = client.getCoordActionInfo(coordActionId);
         for (int counter = 0; counter < 3; counter++) {
             for (org.apache.oozie.client.CoordinatorAction.Status status : statuses) {
                 if (status.equals(actualStatus.getStatus())) {
-                    return;
+                    return status;
                 }
             }
             try {
@@ -1494,48 +1519,26 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
-
+    public String reRun(String cluster, String id, Properties props, boolean isForced) throws FalconException {
         OozieClient client = OozieClientFactory.get(cluster);
+        String actionId = id;
         try {
-            WorkflowJob jobInfo = client.getJobInfo(jobId);
-            if (props == null) {
-                props = new Properties();
+            // If a workflow job is supplied, get its parent coord action
+            if (id.endsWith("-W")) {
+                actionId = client.getJobInfo(id).getParentId();
             }
-
-            //if user has set any of these oozie rerun properties then force rerun flag is ignored
-            if (!props.containsKey(OozieClient.RERUN_FAIL_NODES)
-                    && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
-                props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+            if (StringUtils.isBlank(actionId) || !actionId.contains("-C@")) {
+                throw new FalconException("coord action id supplied for rerun, " + actionId + ", is not valid.");
             }
-
-            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
-            jobprops.putAll(props);
-
-            jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
-            jobprops.remove(OozieClient.BUNDLE_APP_PATH);
-
-            // In case if both props exists one should be removed otherwise it will fail.
-            // This case will occur when user runs workflow with skip-nodes property and
-            // try to do force rerun or rerun with fail-nodes property.
-            if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
-                    && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
-                LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES
-                        + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES);
-                jobprops.remove(OozieClient.RERUN_SKIP_NODES);
-            }
-            client.reRun(jobId, jobprops);
-            assertStatus(cluster, jobId, Job.Status.RUNNING);
-            LOG.info("Rerun job {} on cluster {}", jobId, cluster);
+            return reRunCoordAction(cluster, client.getCoordActionInfo(actionId), props, isForced).name();
         } catch (Exception e) {
-            LOG.error("Unable to rerun workflows", e);
+            LOG.error("Unable to rerun action " + actionId, e);
             throw new FalconException(e);
         }
     }
 
 
-    private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException {
-
+    private String assertStatus(String cluster, String jobId, Status... statuses) throws FalconException {
         String actualStatus = null;
         int retryCount;
         String retry = RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30");
@@ -1554,7 +1557,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                     //ignore
                 }
             } else {
-                return;
+                return actualStatus;
             }
         }
         throw new FalconException("For Job" + jobId + ", actual statuses: " + actualStatus + ", expected statuses: "

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 7bf5c37..3bdc0df 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -90,6 +90,7 @@ public class LocalProxyOozieClient extends OozieClient {
     }
 
     public String run(Properties conf) throws OozieClientException {
+        conf.setProperty("oozie.child.mapreduce.job.tags", "");
         if (conf.getProperty("oozie.wf.application.path") != null) {
             return getLocalOozieClient().run(conf);
         } else if (conf.getProperty("oozie.coord.application.path") != null) {
@@ -213,5 +214,15 @@ public class LocalProxyOozieClient extends OozieClient {
         throw new IllegalStateException("Job logs not supported");
     }
 
+    @Override
+    public void validateWSVersion() throws OozieClientException {
+        // Do nothing as this is local oozie.
+    }
+
+    @Override
+    public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
+            boolean noCleanup, boolean failed, Properties props) throws OozieClientException {
+        return getClient(jobId).reRunCoord(jobId, rerunType, scope, refresh, noCleanup, failed, props);
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 27d2fc2..271b477 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
         <include.prism>true</include.prism>
 
         <slf4j.version>1.7.5</slf4j.version>
-        <oozie.version>4.1.0</oozie.version>
+        <oozie.version>4.2.0</oozie.version>
         <oozie.buildversion>${oozie.version}-falcon</oozie.buildversion>
         <oozie.forcebuild>false</oozie.forcebuild>
         <activemq.version>5.12.0</activemq.version>

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index 2b52762..1fbdbcc 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -23,10 +23,10 @@ package org.apache.falcon.rerun.event;
 public class LaterunEvent extends RerunEvent {
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
+    public LaterunEvent(String clusterName, String wfId, String parentId, long msgInsertTime,
                         long delay, String entityType, String entityName,
                         String instance, int runId, String workflowUser) {
-        super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+        super(clusterName, wfId, parentId, msgInsertTime, delay, entityType, entityName,
                 instance, runId, workflowUser);
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 254f285..b917421 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -38,6 +38,7 @@ public class RerunEvent implements Delayed {
 
     protected String clusterName;
     protected String wfId;
+    protected String parentId;
     protected String workflowUser;
     protected long msgInsertTime;
     protected long delayInMilliSec;
@@ -47,10 +48,11 @@ public class RerunEvent implements Delayed {
     protected int runId;
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
+    public RerunEvent(String clusterName, String wfId, String parentId, long msgInsertTime, long delay,
                       String entityType, String entityName, String instance, int runId, String workflowUser) {
         this.clusterName = clusterName;
         this.wfId = wfId;
+        this.parentId = parentId;
         this.workflowUser = workflowUser;
         this.msgInsertTime = msgInsertTime;
         this.delayInMilliSec = delay;
@@ -69,6 +71,10 @@ public class RerunEvent implements Delayed {
         return wfId;
     }
 
+    public String getParentId() {
+        return parentId;
+    }
+
     public String getWorkflowUser() {
         return workflowUser;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index c2a8fe2..c97d259 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -42,7 +42,7 @@ public class RerunEventFactory<T extends RerunEvent> {
     @SuppressWarnings("unchecked")
     private T lateEventFromString(String line) {
         Map<String, String> map = getMap(line);
-        return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
+        return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"), map.get("parentId"),
                 Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), map.get("instance"),
                 Integer.parseInt(map.get("runId")), map.get("workflowUser"));
@@ -51,7 +51,7 @@ public class RerunEventFactory<T extends RerunEvent> {
     @SuppressWarnings("unchecked")
     public T retryEventFromString(String line) {
         Map<String, String> map = getMap(line);
-        return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
+        return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"), map.get("parentId"),
                 Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), map.get("instance"),
                 Integer.parseInt(map.get("runId")), Integer.parseInt(map.get("attempts")),

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index b5312a6..457445f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -26,10 +26,10 @@ public class RetryEvent extends RerunEvent {
     private int failRetryCount;
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public RetryEvent(String clusterName, String wfId, long msgInsertTime,
+    public RetryEvent(String clusterName, String wfId, String parentId, long msgInsertTime,
                       long delay, String entityType, String entityName, String instance,
                       int runId, int attempts, int failRetryCount, String workflowUser) {
-        super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+        super(clusterName, wfId, parentId, msgInsertTime, delay, entityType, entityName,
                 instance, runId, workflowUser);
         this.attempts = attempts;
         this.failRetryCount = failRetryCount;

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 3ec3617..700095e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -57,7 +57,7 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public abstract void handleRerun(String clusterName, String entityType,
                                      String entityName, String nominalTime, String runId,
-                                     String wfId, String workflowUser, long msgReceivedTime);
+                                     String wfId, String parentId, String workflowUser, long msgReceivedTime);
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index fa0d6ae..e79f122 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.rerun.handler;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
@@ -72,13 +73,18 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
                         message.getWfId(), SchemaHelper.formatDateUTC(new Date()));
                 handler.handleRerun(clusterName, message.getEntityType(), message.getEntityName(),
                         message.getInstance(), Integer.toString(message.getRunId()),
-                        message.getWfId(), message.getWorkflowUser(), System.currentTimeMillis());
+                        message.getWfId(), message.getParentId(),
+                        message.getWorkflowUser(), System.currentTimeMillis());
                 return;
             }
 
             LOG.info("Late changes detected in the following feeds: {}", detectLate);
-
-            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, true);
+            // Use coord action id for rerun if available
+            String id = message.getParentId();
+            if (StringUtils.isBlank(id)) {
+                id = message.getWfId();
+            }
+            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false);
             LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
                     message.getWfId(), message.getClusterName());
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 0be6252..02ab792 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -58,7 +58,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
     @Override
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public void handleRerun(String cluster, String entityType, String entityName, String nominalTime,
-                            String runId, String wfId, String workflowUser, long msgReceivedTime) {
+                            String runId, String wfId, String parentId, String workflowUser, long msgReceivedTime) {
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
             int intRunId = Integer.parseInt(runId);
@@ -88,7 +88,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
 
             LOG.debug("Scheduling the late rerun for entity instance: {} ({}): {} And WorkflowId: {}",
                     entityType, entityName, nominalTime, wfId);
-            LaterunEvent event = new LaterunEvent(cluster, wfId, msgInsertTime.getTime(),
+            LaterunEvent event = new LaterunEvent(cluster, wfId, parentId, msgInsertTime.getTime(),
                     wait, entityType, entityName, nominalTime, intRunId, workflowUser);
             offerToQueue(event);
         } catch (Exception e) {
@@ -232,7 +232,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                 && EntityUtil.getLateProcess(entity) != null) {
             handleRerun(context.getClusterName(), context.getEntityType(),
                     context.getEntityName(), context.getNominalTimeAsISO8601(),
-                    context.getWorkflowRunIdString(), context.getWorkflowId(),
+                    context.getWorkflowRunIdString(), context.getWorkflowId(), context.getWorkflowParentId(),
                     context.getWorkflowUser(), context.getExecutionCompletionTime());
         } else {
             LOG.info("Late date handling not applicable for entityType: " + context.getEntityType()

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 9b46713..836a172 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.rerun.handler;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -53,11 +54,17 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                             + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
-            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, false);
+            // Use coord action id for rerun if available
+            String id = message.getParentId();
+            if (StringUtils.isBlank(id)) {
+                id = message.getWfId();
+            }
+            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false);
         } catch (Exception e) {
             if (e instanceof EntityNotRegisteredException) {
                 LOG.warn("Entity {} of type {} doesn't exist in config store. So retry "
                         + "cannot be done for workflow ", entityName, entityType, message.getWfId());
+
                 return;
             }
             int maxFailRetryCount = Integer.parseInt(StartupProperties.get()

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index fac32b3..48d5ce7 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -44,7 +44,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
     @Override
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public void handleRerun(String clusterName, String entityType, String entityName, String nominalTime,
-                            String runId, String wfId, String workflowUser, long msgReceivedTime) {
+                            String runId, String wfId, String parentId, String workflowUser, long msgReceivedTime) {
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
             Retry retry = getRetry(entity);
@@ -63,7 +63,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
             if (attempts > intRunId) {
                 AbstractRerunPolicy rerunPolicy = RerunPolicyFactory.getRetryPolicy(policy);
                 long delayTime = rerunPolicy.getDelay(delay, Integer.parseInt(runId));
-                RetryEvent event = new RetryEvent(clusterName, wfId,
+                RetryEvent event = new RetryEvent(clusterName, wfId, parentId,
                         msgReceivedTime, delayTime, entityType, entityName,
                         nominalTime, intRunId, attempts, 0, workflowUser);
                 offerToQueue(event);
@@ -122,7 +122,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
         }
         handleRerun(context.getClusterName(), context.getEntityType(),
                 context.getEntityName(), context.getNominalTimeAsISO8601(),
-                context.getWorkflowRunIdString(), context.getWorkflowId(),
+                context.getWorkflowRunIdString(), context.getWorkflowId(), context.getWorkflowParentId(),
                 context.getWorkflowUser(), context.getExecutionCompletionTime());
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index 3c53833..dba4610 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -50,7 +50,7 @@ public class ActiveMQTest {
                 BROKER_URL, DESTINATION);
         activeMQueue.init();
 
-        RerunEvent event = new LaterunEvent("clusterName", "wfId",
+        RerunEvent event = new LaterunEvent("clusterName", "wfId", "parentId",
                 System.currentTimeMillis(), 60 * 1000, "entityType",
                 "entityName", "instance", 0, FalconTestUtil.TEST_USER_1);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
index 4b179d3..cdaf203 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -45,7 +45,7 @@ public class InMemoryQueueTest {
             Thread.sleep(30);
             long time = System.currentTimeMillis();
             int delay = ((5 - index) / 2) * 50;
-            MyEvent event = new MyEvent("someCluster", Integer.toString(index),
+            MyEvent event = new MyEvent("someCluster", Integer.toString(index), "parent",
                     time, delay, "someType", "someName", "someInstance", 0, FalconTestUtil.TEST_USER_1);
             queue.offer(event);
             boolean inserted = false;
@@ -72,10 +72,10 @@ public class InMemoryQueueTest {
     private class MyEvent extends RerunEvent {
 
         //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
-        public MyEvent(String clusterName, String wfId,
+        public MyEvent(String clusterName, String wfId, String parentId,
                        long msgInsertTime, long delay, String entityType,
                        String entityName, String instance, int runId, String workflowUser) {
-            super(clusterName, wfId, msgInsertTime, delay,
+            super(clusterName, wfId, parentId, msgInsertTime, delay,
                     entityType, entityName, instance, runId, workflowUser);
         }
         //RESUME CHECKSTYLE CHECK VisibilityModifierCheck

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 7ce2420..77cb2fa 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -516,11 +516,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
+    public String reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
         InstanceState instanceState = STATE_STORE.getExecutionInstance(jobId);
         ExecutionInstance instance = instanceState.getInstance();
         EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(instance.getEntity(), cluster);
         executor.rerun(instance, props, isForced);
+        return DAGEngineFactory.getDAGEngine(cluster).info(jobId).getStatus().name();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
index eebfa2e..e762b31 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.oozie.action.hadoop.LauncherMapper;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XConfiguration;
@@ -162,6 +163,8 @@ public final class FalconUnit {
     private static void cleanUpOozie() throws IOException, FalconException {
         LocalOozie.stop();
         FileUtils.deleteDirectory(new File(OOZIE_HOME_DIR));
+        // Need to explicitly clean this as Oozie Launcher leaves this behind.
+        FileUtils.deleteQuietly(new File(LauncherMapper.PROPAGATION_CONF_XML));
         resetSystemProperties();
         System.setSecurityManager(null);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
new file mode 100644
index 0000000..3070689
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RpcClientFactory;
+
+/**
+ * A Dummy implementation of RpcClientFactory that does not do RPC.
+ * This is required as OozieClient tries to connect to RM via RPC to kill jobs which fails in local mode.
+ */
+public final class LocalFalconRPCClientFactory implements RpcClientFactory {
+
+    private static LocalFalconRPCClientFactory self = new LocalFalconRPCClientFactory();
+
+    @Override
+    public Object getClient(Class<?> aClass, long l, InetSocketAddress inetSocketAddress, Configuration configuration) {
+        return new LocalFalconApplicationClientProtocolImpl();
+    }
+
+    public static LocalFalconRPCClientFactory get() {
+        return self;
+    }
+
+    private LocalFalconRPCClientFactory() {
+    }
+
+
+    @Override
+    public void stopClient(Object o) {
+
+    }
+
+    /**
+     * Dummy implementation of ApplicationClientProtocol that returns a empty list of applications.
+     */
+    public static class LocalFalconApplicationClientProtocolImpl implements ApplicationClientProtocol {
+
+        public LocalFalconApplicationClientProtocolImpl() {
+
+        }
+
+        @Override
+        public GetNewApplicationResponse getNewApplication(GetNewApplicationRequest getNewApplicationRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public SubmitApplicationResponse submitApplication(SubmitApplicationRequest submitApplicationRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public KillApplicationResponse forceKillApplication(KillApplicationRequest killApplicationRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest getClusterMetricsRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest getClusterNodesRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest getQueueInfoRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetQueueUserAclsInfoResponse getQueueUserAcls(GetQueueUserAclsInfoRequest getQueueUserAclsInfoRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+                moveApplicationAcrossQueuesRequest) throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public ReservationSubmissionResponse submitReservation(ReservationSubmissionRequest
+                reservationSubmissionRequest) throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public ReservationUpdateResponse updateReservation(ReservationUpdateRequest reservationUpdateRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public ReservationDeleteResponse deleteReservation(ReservationDeleteRequest reservationDeleteRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest getNodesToLabelsRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest
+                getClusterNodeLabelsRequest) throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetApplicationReportResponse getApplicationReport(GetApplicationReportRequest
+                getApplicationReportRequest) throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetApplicationsResponse getApplications(GetApplicationsRequest getApplicationsRequest)
+            throws YarnException, IOException {
+            return GetApplicationsResponse.newInstance(new ArrayList<ApplicationReport>());
+        }
+
+        @Override
+        public GetApplicationAttemptReportResponse getApplicationAttemptReport(GetApplicationAttemptReportRequest
+                getApplicationAttemptReportRequest) throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetApplicationAttemptsResponse getApplicationAttempts(GetApplicationAttemptsRequest
+                getApplicationAttemptsRequest) throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetContainerReportResponse getContainerReport(GetContainerReportRequest getContainerReportRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetContainersResponse getContainers(GetContainersRequest getContainersRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest getDelegationTokenRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public RenewDelegationTokenResponse renewDelegationToken(RenewDelegationTokenRequest
+                renewDelegationTokenRequest) throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public CancelDelegationTokenResponse cancelDelegationToken(CancelDelegationTokenRequest
+                cancelDelegationTokenRequest) throws YarnException, IOException {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/resources/yarn-site.xml
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/yarn-site.xml b/unit/src/main/resources/yarn-site.xml
new file mode 100644
index 0000000..ab89bde
--- /dev/null
+++ b/unit/src/main/resources/yarn-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+    <property>
+        <name>yarn.ipc.client.factory.class</name>
+        <value>org.apache.falcon.unit.LocalFalconRPCClientFactory</value>
+    </property>
+
+</configuration>
\ No newline at end of file


Mime
View raw message