Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 57917181D9 for ; Wed, 24 Feb 2016 11:55:53 +0000 (UTC) Received: (qmail 37175 invoked by uid 500); 24 Feb 2016 11:55:37 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 37140 invoked by uid 500); 24 Feb 2016 11:55:37 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 37126 invoked by uid 99); 24 Feb 2016 11:55:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 11:55:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 287F7DFFC0; Wed, 24 Feb 2016 11:55:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: pallavi@apache.org To: commits@falcon.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1835 [IMPROVEMENT] Falcon should do coord rerun rather than workflow rerun Date: Wed, 24 Feb 2016 11:55:37 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master e270b5c70 -> a058cf2b4 FALCON-1835 [IMPROVEMENT] Falcon should do coord rerun rather than workflow rerun …to ensure concurrency Patch contains the following changes: 1. Upgrade to Oozie client 4.2 (from 4.1). 2. Remove workflow rerun completely and switch to coord rerun. 3. Introduced parentId in WorkflowExecutionContext in order to store the coord action of a given workflow. This will be used during retry and late data rerun. 4. In cases where parentId cannot be populated (notification from Post processing action will not contain this), rerun method queries Oozie to retrieve the parent id. 5. The rerun method now returns the actual status after rerun, rather than hardcoded “RUNNING”. 6. In 4.2, Oozie does a Yarn job kill to kill any lingering executions of previous run of a workflow. It uses RMClientProxy to do the same. Since in local mode, RPC cannot happen, had to introduce LocalFalconRPCClientFactory to prevent RPC in local mode. Author: Pallavi Rao Reviewers: Sandeep Samudrala , Srikanth Sundarrajan, Pavan Kolamuri Closes #48 from pallavi-rao/1835 and squashes the following commits: 0e0b317 [Pallavi Rao] FALCON-1835 Clearing old settings from config retrieved 0c0d2c4 [Pallavi Rao] FALCON-1835 Updated doc. 418ac16 [Pallavi Rao] FALCON-1835 Fixed checkstyle issue 96cd509 [Pallavi Rao] FALCON-1835 Updated rerun method to throw exception when both skip.nodes and failnodes are specified 8b0cb1f [Pallavi Rao] FALCON-1835 Updated rerun method to merge user props and job conf. 43df66d [Pallavi Rao] FALCON-1835 Falcon should do coord rerun rather than workflow rerun to ensure concurrency Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/a058cf2b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/a058cf2b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/a058cf2b Branch: refs/heads/master Commit: a058cf2b4481769b1eaf96c4a6a45e5d76c1556f Parents: e270b5c Author: Pallavi Rao Authored: Wed Feb 24 17:25:23 2016 +0530 Committer: Pallavi Rao Committed: Wed Feb 24 17:25:23 2016 +0530 ---------------------------------------------------------------------- .../falcon/workflow/WorkflowExecutionArgs.java | 2 +- .../workflow/WorkflowExecutionContext.java | 4 + .../WorkflowJobEndNotificationService.java | 5 - .../workflow/engine/AbstractWorkflowEngine.java | 3 +- docs/src/site/twiki/EntitySpecification.twiki | 2 + .../falcon/messaging/JMSMessageConsumer.java | 3 + .../workflow/engine/OozieWorkflowEngine.java | 111 ++++----- .../oozie/client/LocalProxyOozieClient.java | 11 + pom.xml | 2 +- .../apache/falcon/rerun/event/LaterunEvent.java | 4 +- .../apache/falcon/rerun/event/RerunEvent.java | 8 +- .../falcon/rerun/event/RerunEventFactory.java | 4 +- .../apache/falcon/rerun/event/RetryEvent.java | 4 +- .../rerun/handler/AbstractRerunHandler.java | 2 +- .../falcon/rerun/handler/LateRerunConsumer.java | 12 +- .../falcon/rerun/handler/LateRerunHandler.java | 6 +- .../falcon/rerun/handler/RetryConsumer.java | 9 +- .../falcon/rerun/handler/RetryHandler.java | 6 +- .../apache/falcon/rerun/queue/ActiveMQTest.java | 2 +- .../falcon/rerun/queue/InMemoryQueueTest.java | 6 +- .../workflow/engine/FalconWorkflowEngine.java | 3 +- .../java/org/apache/falcon/unit/FalconUnit.java | 3 + .../unit/LocalFalconRPCClientFactory.java | 241 +++++++++++++++++++ unit/src/main/resources/yarn-site.xml | 30 +++ 24 files changed, 398 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java index 3363e1f..2171092 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java @@ -55,6 +55,7 @@ public enum WorkflowExecutionArgs { STATUS("status", "status of the user workflow isnstance"), WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie", false), USER_SUBFLOW_ID("subflowId", "external id of user workflow", false), + PARENT_ID("parentId", "The parent of the current workflow, typically coord action", false), WF_START_TIME("workflowStartTime", "workflow start time", false), WF_END_TIME("workflowEndTime", "workflow end time", false), @@ -89,7 +90,6 @@ public enum WorkflowExecutionArgs { CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false), COUNTERS("counters", "store job counters", false); - private final String name; private final String description; private final boolean isRequired; http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 5866369..9b1e1f4 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -295,6 +295,10 @@ public class WorkflowExecutionContext { return getValue(WorkflowExecutionArgs.WORKFLOW_ID); } + public String getWorkflowParentId() { + return getValue(WorkflowExecutionArgs.PARENT_ID); + } + public String getUserSubflowId() { return getValue(WorkflowExecutionArgs.USER_SUBFLOW_ID); } http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java index faea25c..b692258 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java @@ -139,11 +139,6 @@ public class WorkflowJobEndNotificationService implements FalconService { public void notifyWait(WorkflowExecutionContext context) throws FalconException { // Wait notifications can only be from Oozie JMS notifications - - if (!updateContextFromWFConf(context)) { - return; - } - LOG.debug("Sending workflow wait notification to listeners with context : {} ", context); for (WorkflowExecutionListener listener : listeners) { try { http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index b899a58..4d8402a 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -61,7 +61,8 @@ public abstract class AbstractWorkflowEngine { public abstract String delete(Entity entity, String cluster) throws FalconException; - public abstract void reRun(String cluster, String wfId, Properties props, boolean isForced) throws FalconException; + public abstract String reRun(String cluster, String wfId, Properties props, boolean isForced) + throws FalconException; public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/docs/src/site/twiki/EntitySpecification.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index b3d80e2..d08c3a3 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -919,6 +919,8 @@ Examples: The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins. +*NOTE :* If user does a manual rerun with -force option (using the instance rerun API), then the runId will get reset and user might see more Falcon system retries than configured in the process definition. + To enable retries for instances for feeds, user will have to set the following properties in runtime.properties falcon.recipe.retry.policy=periodic http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index ccc2cfb..90bbdd3 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -147,6 +147,9 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { wfProperties.put(WorkflowExecutionArgs.NOMINAL_TIME, getNominalTimeString(Long.parseLong(json.getString("nominalTime")))); } + if (!json.isNull("parentId")) { + wfProperties.put(WorkflowExecutionArgs.PARENT_ID, json.getString("parentId")); + } String appName = message.getStringProperty("appName"); Pair entityTypePair = WorkflowNameBuilder.WorkflowName.getEntityNameAndType(appName); wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first); http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index ebf23da..ab2dd5a 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -97,11 +97,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { WorkflowJob.Status.FAILED); private static final List WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING); private static final List WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED); - private static final List WF_RERUN_PRECOND = - Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED); private static final List COORD_RERUN_PRECOND = - Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED); - + Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED, + CoordinatorAction.Status.KILLED, CoordinatorAction.Status.SUCCEEDED); private static final List BUNDLE_ACTIVE_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED, Job.Status.RUNNINGWITHERROR, Job.Status.PAUSED, Status.PREPPAUSED, Status.PAUSEDWITHERROR); @@ -937,18 +935,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { break; case RERUN: - if (jobInfo == null && COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) { - //Coord action re-run - reRunCoordAction(cluster, coordinatorAction); - status = Status.RUNNING.name(); - } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus())) { - //wf re-run - reRun(cluster, jobInfo.getId(), props, isForced); - status = Status.RUNNING.name(); + if (COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) { + status = reRunCoordAction(cluster, coordinatorAction, props, isForced).name(); } break; - case STATUS: if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) { populateInstanceActions(cluster, jobInfo, instance); @@ -974,30 +965,64 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } - private void reRunCoordAction(String cluster, CoordinatorAction coordinatorAction) throws FalconException { + public CoordinatorAction.Status reRunCoordAction(String cluster, CoordinatorAction coordinatorAction, + Properties props, boolean isForced) throws FalconException { try { OozieClient client = OozieClientFactory.get(cluster); + if (props == null) { + props = new Properties(); + } + // In case if both props exists, throw an exception. + // This case will occur when user runs workflow with skip-nodes property and + // try to do force rerun or rerun with fail-nodes property. + if (props.containsKey(OozieClient.RERUN_FAIL_NODES) + && props.containsKey(OozieClient.RERUN_SKIP_NODES)) { + String msg = "Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES + + " are present in workflow params for " + coordinatorAction.getExternalId(); + LOG.error(msg); + throw new FalconException(msg); + } + + //if user has set any of these oozie rerun properties then force rerun flag is ignored + if (props.containsKey(OozieClient.RERUN_FAIL_NODES)) { + isForced = false; + } + Properties jobprops; + // Get conf when workflow is launched. + if (coordinatorAction.getExternalId() != null) { + WorkflowJob jobInfo = client.getJobInfo(coordinatorAction.getExternalId()); + + jobprops = OozieUtils.toProperties(jobInfo.getConf()); + // Clear the rerun properties from existing configuration + jobprops.remove(OozieClient.RERUN_FAIL_NODES); + jobprops.remove(OozieClient.RERUN_SKIP_NODES); + jobprops.putAll(props); + jobprops.remove(OozieClient.BUNDLE_APP_PATH); + } else { + jobprops = props; + } + client.reRunCoord(coordinatorAction.getJobId(), RestConstants.JOB_COORD_SCOPE_ACTION, - Integer.toString(coordinatorAction.getActionNumber()), true, true); - assertCoordActionStatus(cluster, coordinatorAction.getId(), - org.apache.oozie.client.CoordinatorAction.Status.RUNNING, - org.apache.oozie.client.CoordinatorAction.Status.WAITING, - org.apache.oozie.client.CoordinatorAction.Status.READY); + Integer.toString(coordinatorAction.getActionNumber()), true, true, !isForced, jobprops); LOG.info("Rerun job {} on cluster {}", coordinatorAction.getId(), cluster); + return assertCoordActionStatus(cluster, coordinatorAction.getId(), + org.apache.oozie.client.CoordinatorAction.Status.RUNNING, + org.apache.oozie.client.CoordinatorAction.Status.WAITING, + org.apache.oozie.client.CoordinatorAction.Status.READY); } catch (Exception e) { LOG.error("Unable to rerun workflows", e); throw new FalconException(e); } } - private void assertCoordActionStatus(String cluster, String coordActionId, + private CoordinatorAction.Status assertCoordActionStatus(String cluster, String coordActionId, org.apache.oozie.client.CoordinatorAction.Status... statuses) throws FalconException, OozieClientException { OozieClient client = OozieClientFactory.get(cluster); CoordinatorAction actualStatus = client.getCoordActionInfo(coordActionId); for (int counter = 0; counter < 3; counter++) { for (org.apache.oozie.client.CoordinatorAction.Status status : statuses) { if (status.equals(actualStatus.getStatus())) { - return; + return status; } } try { @@ -1494,48 +1519,26 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } @Override - public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException { - + public String reRun(String cluster, String id, Properties props, boolean isForced) throws FalconException { OozieClient client = OozieClientFactory.get(cluster); + String actionId = id; try { - WorkflowJob jobInfo = client.getJobInfo(jobId); - if (props == null) { - props = new Properties(); + // If a workflow job is supplied, get its parent coord action + if (id.endsWith("-W")) { + actionId = client.getJobInfo(id).getParentId(); } - - //if user has set any of these oozie rerun properties then force rerun flag is ignored - if (!props.containsKey(OozieClient.RERUN_FAIL_NODES) - && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) { - props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); + if (StringUtils.isBlank(actionId) || !actionId.contains("-C@")) { + throw new FalconException("coord action id supplied for rerun, " + actionId + ", is not valid."); } - - Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); - jobprops.putAll(props); - - jobprops.remove(OozieClient.COORDINATOR_APP_PATH); - jobprops.remove(OozieClient.BUNDLE_APP_PATH); - - // In case if both props exists one should be removed otherwise it will fail. - // This case will occur when user runs workflow with skip-nodes property and - // try to do force rerun or rerun with fail-nodes property. - if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES) - && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) { - LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES - + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES); - jobprops.remove(OozieClient.RERUN_SKIP_NODES); - } - client.reRun(jobId, jobprops); - assertStatus(cluster, jobId, Job.Status.RUNNING); - LOG.info("Rerun job {} on cluster {}", jobId, cluster); + return reRunCoordAction(cluster, client.getCoordActionInfo(actionId), props, isForced).name(); } catch (Exception e) { - LOG.error("Unable to rerun workflows", e); + LOG.error("Unable to rerun action " + actionId, e); throw new FalconException(e); } } - private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException { - + private String assertStatus(String cluster, String jobId, Status... statuses) throws FalconException { String actualStatus = null; int retryCount; String retry = RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30"); @@ -1554,7 +1557,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { //ignore } } else { - return; + return actualStatus; } } throw new FalconException("For Job" + jobId + ", actual statuses: " + actualStatus + ", expected statuses: " http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java index 7bf5c37..3bdc0df 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -90,6 +90,7 @@ public class LocalProxyOozieClient extends OozieClient { } public String run(Properties conf) throws OozieClientException { + conf.setProperty("oozie.child.mapreduce.job.tags", ""); if (conf.getProperty("oozie.wf.application.path") != null) { return getLocalOozieClient().run(conf); } else if (conf.getProperty("oozie.coord.application.path") != null) { @@ -213,5 +214,15 @@ public class LocalProxyOozieClient extends OozieClient { throw new IllegalStateException("Job logs not supported"); } + @Override + public void validateWSVersion() throws OozieClientException { + // Do nothing as this is local oozie. + } + + @Override + public List reRunCoord(String jobId, String rerunType, String scope, boolean refresh, + boolean noCleanup, boolean failed, Properties props) throws OozieClientException { + return getClient(jobId).reRunCoord(jobId, rerunType, scope, refresh, noCleanup, failed, props); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 27d2fc2..271b477 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ true 1.7.5 - 4.1.0 + 4.2.0 ${oozie.version}-falcon false 5.12.0 http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java index 2b52762..1fbdbcc 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java @@ -23,10 +23,10 @@ package org.apache.falcon.rerun.event; public class LaterunEvent extends RerunEvent { //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - public LaterunEvent(String clusterName, String wfId, long msgInsertTime, + public LaterunEvent(String clusterName, String wfId, String parentId, long msgInsertTime, long delay, String entityType, String entityName, String instance, int runId, String workflowUser) { - super(clusterName, wfId, msgInsertTime, delay, entityType, entityName, + super(clusterName, wfId, parentId, msgInsertTime, delay, entityType, entityName, instance, runId, workflowUser); } //RESUME CHECKSTYLE CHECK ParameterNumberCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java index 254f285..b917421 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java @@ -38,6 +38,7 @@ public class RerunEvent implements Delayed { protected String clusterName; protected String wfId; + protected String parentId; protected String workflowUser; protected long msgInsertTime; protected long delayInMilliSec; @@ -47,10 +48,11 @@ public class RerunEvent implements Delayed { protected int runId; //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay, + public RerunEvent(String clusterName, String wfId, String parentId, long msgInsertTime, long delay, String entityType, String entityName, String instance, int runId, String workflowUser) { this.clusterName = clusterName; this.wfId = wfId; + this.parentId = parentId; this.workflowUser = workflowUser; this.msgInsertTime = msgInsertTime; this.delayInMilliSec = delay; @@ -69,6 +71,10 @@ public class RerunEvent implements Delayed { return wfId; } + public String getParentId() { + return parentId; + } + public String getWorkflowUser() { return workflowUser; } http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java index c2a8fe2..c97d259 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java @@ -42,7 +42,7 @@ public class RerunEventFactory { @SuppressWarnings("unchecked") private T lateEventFromString(String line) { Map map = getMap(line); - return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"), + return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"), map.get("parentId"), Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")), map.get("entityType"), map.get("entityName"), map.get("instance"), Integer.parseInt(map.get("runId")), map.get("workflowUser")); @@ -51,7 +51,7 @@ public class RerunEventFactory { @SuppressWarnings("unchecked") public T retryEventFromString(String line) { Map map = getMap(line); - return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"), + return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"), map.get("parentId"), Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")), map.get("entityType"), map.get("entityName"), map.get("instance"), Integer.parseInt(map.get("runId")), Integer.parseInt(map.get("attempts")), http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java index b5312a6..457445f 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java @@ -26,10 +26,10 @@ public class RetryEvent extends RerunEvent { private int failRetryCount; //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - public RetryEvent(String clusterName, String wfId, long msgInsertTime, + public RetryEvent(String clusterName, String wfId, String parentId, long msgInsertTime, long delay, String entityType, String entityName, String instance, int runId, int attempts, int failRetryCount, String workflowUser) { - super(clusterName, wfId, msgInsertTime, delay, entityType, entityName, + super(clusterName, wfId, parentId, msgInsertTime, delay, entityType, entityName, instance, runId, workflowUser); this.attempts = attempts; this.failRetryCount = failRetryCount; http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java index 3ec3617..700095e 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java @@ -57,7 +57,7 @@ public abstract class AbstractRerunHandler> extends @Override //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck public void handleRerun(String cluster, String entityType, String entityName, String nominalTime, - String runId, String wfId, String workflowUser, long msgReceivedTime) { + String runId, String wfId, String parentId, String workflowUser, long msgReceivedTime) { try { Entity entity = EntityUtil.getEntity(entityType, entityName); int intRunId = Integer.parseInt(runId); @@ -88,7 +88,7 @@ public class LateRerunHandler> extends LOG.debug("Scheduling the late rerun for entity instance: {} ({}): {} And WorkflowId: {}", entityType, entityName, nominalTime, wfId); - LaterunEvent event = new LaterunEvent(cluster, wfId, msgInsertTime.getTime(), + LaterunEvent event = new LaterunEvent(cluster, wfId, parentId, msgInsertTime.getTime(), wait, entityType, entityName, nominalTime, intRunId, workflowUser); offerToQueue(event); } catch (Exception e) { @@ -232,7 +232,7 @@ public class LateRerunHandler> extends && EntityUtil.getLateProcess(entity) != null) { handleRerun(context.getClusterName(), context.getEntityType(), context.getEntityName(), context.getNominalTimeAsISO8601(), - context.getWorkflowRunIdString(), context.getWorkflowId(), + context.getWorkflowRunIdString(), context.getWorkflowId(), context.getWorkflowParentId(), context.getWorkflowUser(), context.getExecutionCompletionTime()); } else { LOG.info("Late date handling not applicable for entityType: " + context.getEntityType() http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java index 9b46713..836a172 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.rerun.handler; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.aspect.GenericAlert; import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.v0.SchemaHelper; @@ -53,11 +54,17 @@ public class RetryConsumer>> + " At time: {}", (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(), message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis()))); - handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, false); + // Use coord action id for rerun if available + String id = message.getParentId(); + if (StringUtils.isBlank(id)) { + id = message.getWfId(); + } + handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false); } catch (Exception e) { if (e instanceof EntityNotRegisteredException) { LOG.warn("Entity {} of type {} doesn't exist in config store. So retry " + "cannot be done for workflow ", entityName, entityType, message.getWfId()); + return; } int maxFailRetryCount = Integer.parseInt(StartupProperties.get() http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java index fac32b3..48d5ce7 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java @@ -44,7 +44,7 @@ public class RetryHandler> extends @Override //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck public void handleRerun(String clusterName, String entityType, String entityName, String nominalTime, - String runId, String wfId, String workflowUser, long msgReceivedTime) { + String runId, String wfId, String parentId, String workflowUser, long msgReceivedTime) { try { Entity entity = EntityUtil.getEntity(entityType, entityName); Retry retry = getRetry(entity); @@ -63,7 +63,7 @@ public class RetryHandler> extends if (attempts > intRunId) { AbstractRerunPolicy rerunPolicy = RerunPolicyFactory.getRetryPolicy(policy); long delayTime = rerunPolicy.getDelay(delay, Integer.parseInt(runId)); - RetryEvent event = new RetryEvent(clusterName, wfId, + RetryEvent event = new RetryEvent(clusterName, wfId, parentId, msgReceivedTime, delayTime, entityType, entityName, nominalTime, intRunId, attempts, 0, workflowUser); offerToQueue(event); @@ -122,7 +122,7 @@ public class RetryHandler> extends } handleRerun(context.getClusterName(), context.getEntityType(), context.getEntityName(), context.getNominalTimeAsISO8601(), - context.getWorkflowRunIdString(), context.getWorkflowId(), + context.getWorkflowRunIdString(), context.getWorkflowId(), context.getWorkflowParentId(), context.getWorkflowUser(), context.getExecutionCompletionTime()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java ---------------------------------------------------------------------- diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java index 3c53833..dba4610 100644 --- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java +++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java @@ -50,7 +50,7 @@ public class ActiveMQTest { BROKER_URL, DESTINATION); activeMQueue.init(); - RerunEvent event = new LaterunEvent("clusterName", "wfId", + RerunEvent event = new LaterunEvent("clusterName", "wfId", "parentId", System.currentTimeMillis(), 60 * 1000, "entityType", "entityName", "instance", 0, FalconTestUtil.TEST_USER_1); http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java ---------------------------------------------------------------------- diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java index 4b179d3..cdaf203 100644 --- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java +++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java @@ -45,7 +45,7 @@ public class InMemoryQueueTest { Thread.sleep(30); long time = System.currentTimeMillis(); int delay = ((5 - index) / 2) * 50; - MyEvent event = new MyEvent("someCluster", Integer.toString(index), + MyEvent event = new MyEvent("someCluster", Integer.toString(index), "parent", time, delay, "someType", "someName", "someInstance", 0, FalconTestUtil.TEST_USER_1); queue.offer(event); boolean inserted = false; @@ -72,10 +72,10 @@ public class InMemoryQueueTest { private class MyEvent extends RerunEvent { //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck - public MyEvent(String clusterName, String wfId, + public MyEvent(String clusterName, String wfId, String parentId, long msgInsertTime, long delay, String entityType, String entityName, String instance, int runId, String workflowUser) { - super(clusterName, wfId, msgInsertTime, delay, + super(clusterName, wfId, parentId, msgInsertTime, delay, entityType, entityName, instance, runId, workflowUser); } //RESUME CHECKSTYLE CHECK VisibilityModifierCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 7ce2420..77cb2fa 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -516,11 +516,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { } @Override - public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException { + public String reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException { InstanceState instanceState = STATE_STORE.getExecutionInstance(jobId); ExecutionInstance instance = instanceState.getInstance(); EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(instance.getEntity(), cluster); executor.rerun(instance, props, isForced); + return DAGEngineFactory.getDAGEngine(cluster).info(jobId).getStatus().name(); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java index eebfa2e..e762b31 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.oozie.action.hadoop.LauncherMapper; import org.apache.oozie.local.LocalOozie; import org.apache.oozie.service.Services; import org.apache.oozie.util.XConfiguration; @@ -162,6 +163,8 @@ public final class FalconUnit { private static void cleanUpOozie() throws IOException, FalconException { LocalOozie.stop(); FileUtils.deleteDirectory(new File(OOZIE_HOME_DIR)); + // Need to explicitly clean this as Oozie Launcher leaves this behind. + FileUtils.deleteQuietly(new File(LauncherMapper.PROPAGATION_CONF_XML)); resetSystemProperties(); System.setSecurityManager(null); } http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java new file mode 100644 index 0000000..3070689 --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RpcClientFactory; + +/** + * A Dummy implementation of RpcClientFactory that does not do RPC. + * This is required as OozieClient tries to connect to RM via RPC to kill jobs which fails in local mode. + */ +public final class LocalFalconRPCClientFactory implements RpcClientFactory { + + private static LocalFalconRPCClientFactory self = new LocalFalconRPCClientFactory(); + + @Override + public Object getClient(Class aClass, long l, InetSocketAddress inetSocketAddress, Configuration configuration) { + return new LocalFalconApplicationClientProtocolImpl(); + } + + public static LocalFalconRPCClientFactory get() { + return self; + } + + private LocalFalconRPCClientFactory() { + } + + + @Override + public void stopClient(Object o) { + + } + + /** + * Dummy implementation of ApplicationClientProtocol that returns a empty list of applications. + */ + public static class LocalFalconApplicationClientProtocolImpl implements ApplicationClientProtocol { + + public LocalFalconApplicationClientProtocolImpl() { + + } + + @Override + public GetNewApplicationResponse getNewApplication(GetNewApplicationRequest getNewApplicationRequest) + throws YarnException, IOException { + return null; + } + + @Override + public SubmitApplicationResponse submitApplication(SubmitApplicationRequest submitApplicationRequest) + throws YarnException, IOException { + return null; + } + + @Override + public KillApplicationResponse forceKillApplication(KillApplicationRequest killApplicationRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest getClusterMetricsRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest getClusterNodesRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest getQueueInfoRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls(GetQueueUserAclsInfoRequest getQueueUserAclsInfoRequest) + throws YarnException, IOException { + return null; + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest + moveApplicationAcrossQueuesRequest) throws YarnException, IOException { + return null; + } + + @Override + public ReservationSubmissionResponse submitReservation(ReservationSubmissionRequest + reservationSubmissionRequest) throws YarnException, IOException { + return null; + } + + @Override + public ReservationUpdateResponse updateReservation(ReservationUpdateRequest reservationUpdateRequest) + throws YarnException, IOException { + return null; + } + + @Override + public ReservationDeleteResponse deleteReservation(ReservationDeleteRequest reservationDeleteRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest getNodesToLabelsRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest + getClusterNodeLabelsRequest) throws YarnException, IOException { + return null; + } + + @Override + public GetApplicationReportResponse getApplicationReport(GetApplicationReportRequest + getApplicationReportRequest) throws YarnException, IOException { + return null; + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest getApplicationsRequest) + throws YarnException, IOException { + return GetApplicationsResponse.newInstance(new ArrayList()); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport(GetApplicationAttemptReportRequest + getApplicationAttemptReportRequest) throws YarnException, IOException { + return null; + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts(GetApplicationAttemptsRequest + getApplicationAttemptsRequest) throws YarnException, IOException { + return null; + } + + @Override + public GetContainerReportResponse getContainerReport(GetContainerReportRequest getContainerReportRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest getContainersRequest) + throws YarnException, IOException { + return null; + } + + @Override + public GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest getDelegationTokenRequest) + throws YarnException, IOException { + return null; + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken(RenewDelegationTokenRequest + renewDelegationTokenRequest) throws YarnException, IOException { + return null; + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken(CancelDelegationTokenRequest + cancelDelegationTokenRequest) throws YarnException, IOException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/resources/yarn-site.xml ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/yarn-site.xml b/unit/src/main/resources/yarn-site.xml new file mode 100644 index 0000000..ab89bde --- /dev/null +++ b/unit/src/main/resources/yarn-site.xml @@ -0,0 +1,30 @@ + + + + + + + + + + yarn.ipc.client.factory.class + org.apache.falcon.unit.LocalFalconRPCClientFactory + + + \ No newline at end of file