Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 80E6918F18 for ; Tue, 25 Aug 2015 11:51:21 +0000 (UTC) Received: (qmail 9656 invoked by uid 500); 25 Aug 2015 11:51:21 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 9552 invoked by uid 500); 25 Aug 2015 11:51:21 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 9537 invoked by uid 99); 25 Aug 2015 11:51:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 11:51:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0937AE0F7F; Tue, 25 Aug 2015 11:51:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ajayyadava@apache.org To: commits@falcon.apache.org Date: Tue, 25 Aug 2015 11:51:21 -0000 Message-Id: <7f3e85188acf4bae88532e79514fc804@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] falcon git commit: FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity. Contributed by Balu Vellanki. FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity. Contributed by Balu Vellanki. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/17e2f71c Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/17e2f71c Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/17e2f71c Branch: refs/heads/0.7 Commit: 17e2f71cd7b1a2681abdec758cc5327dc4c543ea Parents: ba83ad2 Author: Ajay Yadava Authored: Tue Aug 25 12:41:35 2015 +0530 Committer: Ajay Yadava Committed: Tue Aug 25 17:20:45 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/falcon/cli/FalconCLI.java | 33 ++++++++--- .../falcon/client/AbstractFalconClient.java | 3 +- .../org/apache/falcon/client/FalconClient.java | 55 ++++++++++------- .../workflow/engine/AbstractWorkflowEngine.java | 9 +-- .../workflow/engine/OozieWorkflowEngine.java | 62 +++++++++++++------- .../falcon/resource/AbstractEntityManager.java | 9 +-- .../AbstractSchedulableEntityManager.java | 19 +++--- .../proxy/SchedulableEntityManagerProxy.java | 28 +++++---- .../falcon/resource/EntityManagerTest.java | 6 +- .../apache/falcon/unit/FalconUnitClient.java | 9 +-- .../apache/falcon/unit/FalconUnitTestBase.java | 8 +-- .../org/apache/falcon/unit/TestFalconUnit.java | 2 +- .../falcon/resource/ConfigSyncService.java | 5 +- .../resource/SchedulableEntityManager.java | 15 +++-- .../java/org/apache/falcon/cli/FalconCLIIT.java | 27 +++++++++ .../falcon/resource/EntityManagerJerseyIT.java | 41 ++++++++----- .../org/apache/falcon/resource/TestContext.java | 31 ++++++++-- 18 files changed, 245 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f31f839..a1054fe 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,8 @@ Trunk (Unreleased) FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava) IMPROVEMENTS + FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity(Balu Vellanki via Ajay Yadava) + FALCON-1374 Remove the cap on numResults(Pragya Mittal via Ajay Yadava) FALCON-1379 Doc describes retention incorrectly(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index 11f6bff..11dfe72 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -90,6 +90,7 @@ public class FalconCLI { public static final String PATH_OPT = "path"; public static final String LIST_OPT = "list"; public static final String TOUCH_OPT = "touch"; + public static final String SKIPDRYRUN_OPT = "skipDryRun"; public static final String FIELDS_OPT = "fields"; public static final String FILTER_BY_OPT = "filterBy"; @@ -429,6 +430,11 @@ public class FalconCLI { Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT), null, "numResults"); Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances"); + Boolean skipDryRun = null; + if (optionsList.contains(SKIPDRYRUN_OPT)) { + skipDryRun = true; + } + EntityType entityTypeEnum = null; if (optionsList.contains(LIST_OPT)) { if (entityType == null) { @@ -460,20 +466,19 @@ public class FalconCLI { validateNotEmpty(filePath, "file"); validateColo(optionsList); validateNotEmpty(entityName, ENTITY_NAME_OPT); - result = client.update(entityType, entityName, filePath).getMessage(); + result = client.update(entityType, entityName, filePath, skipDryRun).getMessage(); } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); - result = - client.submitAndSchedule(entityType, filePath).getMessage(); + result = client.submitAndSchedule(entityType, filePath, skipDryRun).getMessage(); } else if (optionsList.contains(VALIDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); - result = client.validate(entityType, filePath).getMessage(); + result = client.validate(entityType, filePath, skipDryRun).getMessage(); } else if (optionsList.contains(SCHEDULE_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); - result = client.schedule(entityTypeEnum, entityName, colo).getMessage(); + result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun).getMessage(); } else if (optionsList.contains(SUSPEND_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); @@ -522,7 +527,7 @@ public class FalconCLI { } else if (optionsList.contains(TOUCH_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); - result = client.touch(entityType, entityName, colo).getMessage(); + result = client.touch(entityType, entityName, colo, skipDryRun).getMessage(); } else if (optionsList.contains(HELP_CMD)) { OUT.get().println("Falcon Help"); } else { @@ -742,6 +747,7 @@ public class FalconCLI { Option numInstances = new Option(NUM_INSTANCES_OPT, true, "Number of instances to return per entity summary request"); Option path = new Option(PATH_OPT, true, "Path for a feed's instance"); + Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine"); entityOptions.addOption(url); entityOptions.addOption(path); @@ -763,6 +769,7 @@ public class FalconCLI { entityOptions.addOption(offset); entityOptions.addOption(numResults); entityOptions.addOption(numInstances); + entityOptions.addOption(skipDryRun); return entityOptions; } @@ -926,6 +933,9 @@ public class FalconCLI { Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe operation"); recipeOptions.addOption(recipeOperation); + Option skipDryRunOperation = new Option(SKIPDRYRUN_OPT, false, "skip dryrun operation"); + recipeOptions.addOption(skipDryRunOperation); + return recipeOptions; } @@ -1015,6 +1025,11 @@ public class FalconCLI { } private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException { + Set optionsList = new HashSet(); + for (Option option : commandLine.getOptions()) { + optionsList.add(option.getOpt()); + } + String recipeName = commandLine.getOptionValue(RECIPE_NAME); String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME); String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION); @@ -1022,8 +1037,12 @@ public class FalconCLI { validateNotEmpty(recipeName, RECIPE_NAME); validateNotEmpty(recipeOperation, RECIPE_OPERATION); validateRecipeOperations(recipeOperation); + Boolean skipDryRun = null; + if (optionsList.contains(SKIPDRYRUN_OPT)) { + skipDryRun = true; + } - String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation).toString(); + String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation, skipDryRun).toString(); OUT.get().println(result); } http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index bb6d8c9..282b41b 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -48,6 +48,7 @@ public abstract class AbstractFalconClient { * @return * @throws FalconCLIException */ - public abstract APIResult schedule(EntityType entityType, String entityName, String colo) throws FalconCLIException; + public abstract APIResult schedule(EntityType entityType, String entityName, + String colo, Boolean skipDryRun) throws FalconCLIException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index d9bdf64..44436d2 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -285,41 +285,41 @@ public class FalconClient extends AbstractFalconClient { return str; } - public APIResult schedule(EntityType entityType, String entityName, String colo) + public APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun) throws FalconCLIException { return sendEntityRequest(Entities.SCHEDULE, entityType, entityName, - colo); + colo, skipDryRun); } public APIResult suspend(EntityType entityType, String entityName, String colo) throws FalconCLIException { - return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo); + return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null); } public APIResult resume(EntityType entityType, String entityName, String colo) throws FalconCLIException { - return sendEntityRequest(Entities.RESUME, entityType, entityName, colo); + return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null); } public APIResult delete(EntityType entityType, String entityName) throws FalconCLIException { - return sendEntityRequest(Entities.DELETE, entityType, entityName, null); + return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null); } - public APIResult validate(String entityType, String filePath) + public APIResult validate(String entityType, String filePath, Boolean skipDryRun) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); return sendEntityRequestWithObject(Entities.VALIDATE, entityType, - entityStream, null); + entityStream, null, skipDryRun); } public APIResult submit(String entityType, String filePath) @@ -327,14 +327,17 @@ public class FalconClient extends AbstractFalconClient { InputStream entityStream = getServletInputStream(filePath); return sendEntityRequestWithObject(Entities.SUBMIT, entityType, - entityStream, null); + entityStream, null, null); } - public APIResult update(String entityType, String entityName, String filePath) + public APIResult update(String entityType, String entityName, String filePath, Boolean skipDryRun) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); Entities operation = Entities.UPDATE; WebResource resource = service.path(operation.path).path(entityType).path(entityName); + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(operation.mimeType).type(MediaType.TEXT_XML) @@ -343,18 +346,18 @@ public class FalconClient extends AbstractFalconClient { return parseAPIResult(clientResponse); } - public APIResult submitAndSchedule(String entityType, String filePath) + public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE, - entityType, entityStream, null); + entityType, entityStream, null, skipDryRun); } public APIResult getStatus(EntityType entityType, String entityName, String colo) throws FalconCLIException { - return sendEntityRequest(Entities.STATUS, entityType, entityName, colo); + return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null); } public Entity getDefinition(String entityType, String entityName) @@ -400,12 +403,16 @@ public class FalconClient extends AbstractFalconClient { orderBy, sortOrder, offset, numResults, numInstances); } - public APIResult touch(String entityType, String entityName, String colo) throws FalconCLIException { + public APIResult touch(String entityType, String entityName, + String colo, Boolean skipDryRun) throws FalconCLIException { Entities operation = Entities.TOUCH; WebResource resource = service.path(operation.path).path(entityType).path(entityName); if (colo != null) { resource = resource.queryParam("colo", colo); } + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(operation.mimeType).type(MediaType.TEXT_XML) @@ -604,13 +611,17 @@ public class FalconClient extends AbstractFalconClient { } private APIResult sendEntityRequest(Entities entities, EntityType entityType, - String entityName, String colo) throws FalconCLIException { + String entityName, String colo, Boolean skipDryRun) throws FalconCLIException { WebResource resource = service.path(entities.path) .path(entityType.toString().toLowerCase()).path(entityName); if (colo != null) { resource = resource.queryParam("colo", colo); } + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } + ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(entities.mimeType).type(MediaType.TEXT_XML) @@ -731,13 +742,16 @@ public class FalconClient extends AbstractFalconClient { return parseEntityList(clientResponse); } - private APIResult sendEntityRequestWithObject(Entities entities, String entityType, - Object requestObject, String colo) throws FalconCLIException { + private APIResult sendEntityRequestWithObject(Entities entities, String entityType, Object requestObject, + String colo, Boolean skipDryRun) throws FalconCLIException { WebResource resource = service.path(entities.path) .path(entityType); if (colo != null) { resource = resource.queryParam("colo", colo); } + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } ClientResponse clientResponse = resource .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(entities.mimeType).type(MediaType.TEXT_XML) @@ -962,9 +976,8 @@ public class FalconClient extends AbstractFalconClient { return sendMetadataLineageRequest(MetadataOperations.EDGES, id); } - public APIResult submitRecipe(String recipeName, - String recipeToolClassName, - final String recipeOperation) throws FalconCLIException { + public APIResult submitRecipe(String recipeName, String recipeToolClassName, + final String recipeOperation, Boolean skipDryRun) throws FalconCLIException { String recipePath = clientProperties.getProperty("falcon.recipe.path"); if (StringUtils.isEmpty(recipePath)) { @@ -1010,8 +1023,8 @@ public class FalconClient extends AbstractFalconClient { } else { RecipeTool.main(args); } - validate(EntityType.PROCESS.toString(), processFile); - return submitAndSchedule(EntityType.PROCESS.toString(), processFile); + validate(EntityType.PROCESS.toString(), processFile, skipDryRun); + return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun); } catch (Exception e) { throw new FalconCLIException(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 07fafb5..4d45cc7 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -49,7 +49,7 @@ public abstract class AbstractWorkflowEngine { public abstract boolean isAlive(Cluster cluster) throws FalconException; - public abstract void schedule(Entity entity) throws FalconException; + public abstract void schedule(Entity entity, Boolean skipDryRun) throws FalconException; public abstract String suspend(Entity entity) throws FalconException; @@ -61,7 +61,7 @@ public abstract class AbstractWorkflowEngine { public abstract void reRun(String cluster, String wfId, Properties props, boolean isForced) throws FalconException; - public abstract void dryRun(Entity entity, String clusterName) throws FalconException; + public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException; public abstract boolean isActive(Entity entity) throws FalconException; @@ -88,9 +88,10 @@ public abstract class AbstractWorkflowEngine { public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end, List lifeCycles) throws FalconException; - public abstract String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException; + public abstract String update(Entity oldEntity, Entity newEntity, + String cluster, Boolean skipDryRun) throws FalconException; - public abstract String touch(Entity entity, String cluster) throws FalconException; + public abstract String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException; public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 2f3dc6f..7e6cd6c 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -112,6 +112,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { Arrays.asList(Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED); private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters"; + private static final String FALCON_SKIP_DRYRUN = "falcon.skip.dryrun"; private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100; // milliseconds private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count"; @@ -142,7 +143,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } @Override - public void schedule(Entity entity) throws FalconException { + public void schedule(Entity entity, Boolean skipDryRun) throws FalconException { Map bundleMap = findLatestBundle(entity); List schedClusters = new ArrayList(); for (Map.Entry entry : bundleMap.entrySet()) { @@ -168,7 +169,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } //Do dryRun of coords before schedule as schedule is asynchronous - dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH))); + dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun); scheduleEntity(clusterName, properties, entity); } } @@ -196,18 +197,28 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } @Override - public void dryRun(Entity entity, String clusterName) throws FalconException { + public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException { OozieEntityBuilder builder = OozieEntityBuilder.get(entity); Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis()); Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName); Properties props = builder.build(cluster, buildPath); if (props != null) { - dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH))); + dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun); } } + private void dryRunInternal(Cluster cluster, Path buildPath, Boolean skipDryRun) throws FalconException { + if (null != skipDryRun && skipDryRun) { + LOG.info("Skipping dryrun as directed by param in cli/RestApi."); + return; + } else { + String skipDryRunStr = RuntimeProperties.get().getProperty(FALCON_SKIP_DRYRUN, "false").toLowerCase(); + if (Boolean.valueOf(skipDryRunStr)) { + LOG.info("Skipping dryrun as directed by Runtime properties."); + return; + } + } - private void dryRunInternal(Cluster cluster, Path buildPath) throws FalconException { BUNDLEAPP bundle = OozieBundleBuilder.unmarshal(cluster, buildPath); OozieClient client = OozieClientFactory.get(cluster.getName()); for (COORDINATOR coord : bundle.getCoordinator()) { @@ -322,7 +333,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } return Collections.max(bundles, new Comparator() { - @Override public int compare(BundleJob o1, BundleJob o2) { + @Override + public int compare(BundleJob o1, BundleJob o2) { return o1.getCreatedTime().compareTo(o2.getCreatedTime()); } }); @@ -683,7 +695,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { for (WorkflowAction action : wfActions) { if (action.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(action.getExternalId())) { // if the action is sub-workflow, get job urls of all actions within the sub-workflow - List subWorkFlowActions = getWorkflowInfo(cluster, action.getExternalId()).getActions(); + List subWorkFlowActions = getWorkflowInfo(cluster, + action.getExternalId()).getActions(); for (WorkflowAction subWfAction : subWorkFlowActions) { if (!subWfAction.getType().startsWith(":")) { InstancesResult.InstanceAction instanceAction = @@ -1039,7 +1052,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } @Override - public String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException { + public String update(Entity oldEntity, Entity newEntity, + String cluster, Boolean skipDryRun) throws FalconException { BundleJob bundle = findLatestBundle(oldEntity, cluster); boolean entityUpdated = false; @@ -1068,26 +1082,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return getUpdateString(newEntity, new Date(), bundle, bundle); } - LOG.debug("Going to update! : {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle - .getId()); + LOG.debug("Going to update! : {} for cluster {}, bundle: {}", + newEntity.toShortString(), cluster, bundle.getId()); result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle, - CurrentUser.getUser())).append("\n"); + CurrentUser.getUser(), skipDryRun)).append("\n"); LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle.getId()); } - result.append(updateDependents(clusterEntity, oldEntity, newEntity)); + result.append(updateDependents(clusterEntity, oldEntity, newEntity, skipDryRun)); return result.toString(); } @Override - public String touch(Entity entity, String cluster) throws FalconException { + public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException { BundleJob bundle = findLatestBundle(entity, cluster); Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); StringBuilder result = new StringBuilder(); if (bundle != MISSING) { - LOG.info("Updating entity {} for cluster: {}, bundle: {}", entity.toShortString(), cluster, bundle.getId()); - String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser()); + LOG.info("Updating entity {} for cluster: {}, bundle: {}", + entity.toShortString(), cluster, bundle.getId()); + String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser(), skipDryRun); result.append(output).append("\n"); LOG.info("Entity update complete: {} for cluster {}, bundle: {}", entity.toShortString(), cluster, bundle.getId()); @@ -1124,7 +1139,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return builder.toString(); } - private String updateDependents(Cluster cluster, Entity oldEntity, Entity newEntity) throws FalconException { + private String updateDependents(Cluster cluster, Entity oldEntity, + Entity newEntity, Boolean skipDryRun) throws FalconException { //Update affected entities Set affectedEntities = EntityGraph.get().getDependents(oldEntity); StringBuilder result = new StringBuilder(); @@ -1147,7 +1163,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId()); result.append(updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle, - affectedProcBundle.getUser())).append("\n"); + affectedProcBundle.getUser(), skipDryRun)).append("\n"); LOG.info("Entity update complete: {} for cluster {}, bundle: {}", affectedEntity.toShortString(), cluster, affectedProcBundle.getId()); } @@ -1178,7 +1194,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return null; } - private void updateCoords(String cluster, BundleJob bundle, int concurrency, Date endTime) throws FalconException { + private void updateCoords(String cluster, BundleJob bundle, + int concurrency, Date endTime) throws FalconException { if (endTime.compareTo(now()) <= 0) { throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past"); } @@ -1217,14 +1234,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle, - String user) throws FalconException { + String user, Boolean skipDryRun) throws FalconException { String clusterName = cluster.getName(); Date effectiveTime = getEffectiveTime(cluster, newEntity); LOG.info("Effective time " + effectiveTime); //Validate that new entity can be scheduled - dryRunForUpdate(cluster, newEntity, effectiveTime); + dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun); boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus()); @@ -1256,11 +1273,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime); } - private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime) throws FalconException { + private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime, + Boolean skipDryRun) throws FalconException { Entity clone = entity.copy(); EntityUtil.setStartDate(clone, cluster.getName(), startTime); try { - dryRun(clone, cluster.getName()); + dryRun(clone, cluster.getName(), skipDryRun); } catch (FalconException e) { throw new FalconException("The new entity " + entity.toShortString() + " can't be scheduled", e); } http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index f2f9826..78964dd 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -206,7 +206,7 @@ public abstract class AbstractEntityManager { * @param type entity type * @return APIResule -Succeeded or Failed */ - public APIResult validate(HttpServletRequest request, String type) { + public APIResult validate(HttpServletRequest request, String type, Boolean skipDryRun) { try { EntityType entityType = EntityType.getEnum(type); Entity entity = deserializeEntity(request, entityType); @@ -217,7 +217,7 @@ public abstract class AbstractEntityManager { Set clusters = EntityUtil.getClustersDefinedInColos(entity); for (String cluster : clusters) { try { - getWorkflowEngine().dryRun(entity, cluster); + getWorkflowEngine().dryRun(entity, cluster, skipDryRun); } catch (FalconException e) { throw new FalconException("dryRun failed on cluster " + cluster, e); } @@ -267,7 +267,8 @@ public abstract class AbstractEntityManager { } } - public APIResult update(HttpServletRequest request, String type, String entityName, String colo) { + public APIResult update(HttpServletRequest request, String type, String entityName, + String colo, Boolean skipDryRun) { checkColo(colo); List tokenList = null; try { @@ -292,7 +293,7 @@ public abstract class AbstractEntityManager { oldClusters.removeAll(newClusters); //deleted clusters for (String cluster : newClusters) { - result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster)); + result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster, skipDryRun)); } for (String cluster : oldClusters) { getWorkflowEngine().delete(oldEntity, cluster); http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index e38749a..5b415a2 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -66,10 +66,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM public APIResult schedule( @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, @Dimension("entityName") @PathParam("entity") String entity, - @Dimension("colo") @PathParam("colo") String colo) { + @Dimension("colo") @PathParam("colo") String colo, + @QueryParam("skipDryRun") Boolean skipDryRun) { checkColo(colo); try { - scheduleInternal(type, entity); + scheduleInternal(type, entity, skipDryRun); return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") scheduled successfully"); } catch (Throwable e) { LOG.error("Unable to schedule workflow", e); @@ -77,7 +78,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM } } - private synchronized void scheduleInternal(String type, String entity) + private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun) throws FalconException, AuthorizationException { checkSchedulableEntity(type); @@ -90,7 +91,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM + entityObj.toShortString()); } LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName()); - getWorkflowEngine().schedule(entityObj); + getWorkflowEngine().schedule(entityObj, skipDryRun); } catch (Exception e) { throw new FalconException("Entity schedule failed for " + type + ": " + entity, e); } finally { @@ -110,12 +111,13 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM */ public APIResult submitAndSchedule( @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, - @Dimension("colo") @PathParam("colo") String colo) { + @Dimension("colo") @PathParam("colo") String colo, + @QueryParam("skipDryRun") Boolean skipDryRun) { checkColo(colo); try { checkSchedulableEntity(type); Entity entity = submitInternal(request, type); - scheduleInternal(type, entity.getName()); + scheduleInternal(type, entity.getName(), skipDryRun); return new APIResult(APIResult.Status.SUCCEEDED, entity.getName() + "(" + type + ") scheduled successfully"); } catch (Throwable e) { @@ -265,14 +267,15 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM */ public APIResult touch(@Dimension("entityType") @PathParam("type") String type, @Dimension("entityName") @PathParam("entity") String entityName, - @Dimension("colo") @QueryParam("colo") String colo) { + @Dimension("colo") @QueryParam("colo") String colo, + @QueryParam("skipDryRun") Boolean skipDryRun) { checkColo(colo); StringBuilder result = new StringBuilder(); try { Entity entity = EntityUtil.getEntity(type, entityName); Set clusters = EntityUtil.getClustersDefinedInColos(entity); for (String cluster : clusters) { - result.append(getWorkflowEngine().touch(entity, cluster)); + result.append(getWorkflowEngine().touch(entity, cluster, skipDryRun)); } } catch (Throwable e) { LOG.error("Touch failed", e); http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index d22e8a3..ceabb06 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -174,7 +174,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) @Override - public APIResult validate(@Context final HttpServletRequest request, @PathParam("type") final String type) { + public APIResult validate(@Context final HttpServletRequest request, @PathParam("type") final String type, + @QueryParam("skipDryRun") final Boolean skipDryRun) { final HttpServletRequest bufferedRequest = getBufferedRequest(request); EntityType entityType = EntityType.getEnum(type); final Entity entity; @@ -192,7 +193,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("validate", bufferedRequest, type); + return getEntityManager(colo).invoke("validate", bufferedRequest, type, skipDryRun); } }.execute(); } @@ -245,7 +246,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana public APIResult update( @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type, @Dimension("entityName") @PathParam("entity") final String entityName, - @Dimension("colo") @QueryParam("colo") String ignore) { + @Dimension("colo") @QueryParam("colo") String ignore, + @QueryParam("skipDryRun") final Boolean skipDryRun) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); final Set oldColos = getApplicableColos(type, entityName); @@ -281,7 +283,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName, colo); + return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName, + colo, skipDryRun); } }.execute()); } @@ -308,7 +311,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana // update only if all are updated if (!embeddedMode && result) { - results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo)); + results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo, skipDryRun)); } return consolidateResult(results, APIResult.class); @@ -322,7 +325,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana public APIResult touch( @Dimension("entityType") @PathParam("type") final String type, @Dimension("entityName") @PathParam("entity") final String entityName, - @Dimension("colo") @QueryParam("colo") final String coloExpr) { + @Dimension("colo") @QueryParam("colo") final String coloExpr, + @QueryParam("skipDryRun") final Boolean skipDryRun) { final Set colosFromExp = getColosFromExpression(coloExpr, type, entityName); return new EntityProxy(type, entityName) { @Override @@ -332,7 +336,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("touch", type, entityName, colo); + return getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun); } }.execute(); } @@ -384,7 +388,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana public APIResult schedule(@Context final HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type, @Dimension("entityName") @PathParam("entity") final String entity, - @Dimension("colo") @QueryParam("colo") final String coloExpr) { + @Dimension("colo") @QueryParam("colo") final String coloExpr, + @QueryParam("skipDryRun") final Boolean skipDryRun) { final HttpServletRequest bufferedRequest = getBufferedRequest(request); return new EntityProxy(type, entity) { @@ -395,7 +400,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo); + return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun); } }.execute(); } @@ -408,12 +413,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override public APIResult submitAndSchedule( @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, - @Dimension("colo") @QueryParam("colo") String coloExpr) { + @Dimension("colo") @QueryParam("colo") String coloExpr, + @QueryParam("skipDryRun") Boolean skipDryRun) { BufferedRequest bufferedRequest = new BufferedRequest(request); String entity = getEntity(bufferedRequest, type).getName(); Map results = new HashMap(); results.put("submit", submit(bufferedRequest, type, coloExpr)); - results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr)); + results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun)); return consolidateResult(results, APIResult.class); } http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java index be1fe1f..cce8737 100644 --- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java +++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java @@ -95,8 +95,7 @@ public class EntityManagerTest extends AbstractEntityManager { invalidProcessXML); try { - validate(mockHttpServletRequest, - EntityType.PROCESS.name()); + validate(mockHttpServletRequest, EntityType.PROCESS.name(), false); Assert.fail("Invalid entity type was accepted by the system"); } catch (FalconWebException ignore) { // ignore @@ -110,8 +109,7 @@ public class EntityManagerTest extends AbstractEntityManager { invalidProcessXML); try { - validate(mockHttpServletRequest, - "InvalidEntityType"); + validate(mockHttpServletRequest, "InvalidEntityType", false); Assert.fail("Invalid entity type was accepted by the system"); } catch (FalconWebException ignore) { // ignore http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index e898fc3..eb65cb3 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -118,8 +118,9 @@ public class FalconUnitClient extends AbstractFalconClient { * @throws FalconException */ @Override - public APIResult schedule(EntityType entityType, String entityName, String cluster) throws FalconCLIException { - return schedule(entityType, entityName, null, 0, cluster); + public APIResult schedule(EntityType entityType, String entityName, + String cluster, Boolean skipDryRun) throws FalconCLIException { + return schedule(entityType, entityName, null, 0, cluster, skipDryRun); } @@ -133,7 +134,7 @@ public class FalconUnitClient extends AbstractFalconClient { * @return boolean */ public APIResult schedule(EntityType entityType, String entityName, String startTime, int numInstances, - String cluster) throws FalconCLIException { + String cluster, Boolean skipDryRun) throws FalconCLIException { try { FalconUnitHelper.checkSchedulableEntity(entityType.toString()); Entity entity = EntityUtil.getEntity(entityType, entityName); @@ -146,7 +147,7 @@ public class FalconUnitClient extends AbstractFalconClient { if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) { updateStartAndEndTime((Process) entity, startTime, numInstances, cluster); } - workflowEngine.schedule(entity); + workflowEngine.schedule(entity, skipDryRun); LOG.info(entityName + " is scheduled successfully"); return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully"); } catch (FalconException e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 9f00d94..997b301 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -147,7 +147,7 @@ public class FalconUnitTestBase { } public APIResult scheduleProcess(String processName, String startTime, int numInstances, - String cluster, String localWfPath) throws FalconException, + String cluster, String localWfPath, Boolean skipDryRun) throws FalconException, IOException, FalconCLIException { Process processEntity = configStore.get(EntityType.PROCESS, processName); if (processEntity == null) { @@ -155,16 +155,16 @@ public class FalconUnitTestBase { } String workflowPath = processEntity.getWorkflow().getPath(); fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath)); - return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster); + return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun); } public APIResult scheduleProcess(String processName, String startTime, int numInstances, - String cluster) throws FalconException, FalconCLIException { + String cluster, Boolean skipDryRun) throws FalconException, FalconCLIException { Process processEntity = configStore.get(EntityType.PROCESS, processName); if (processEntity == null) { throw new FalconException("Process not found " + processName); } - return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster); + return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun); } private Map updateColoAndCluster(String colo, String cluster, Map props) { http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 57b7b1b..498f50e 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -44,7 +44,7 @@ public class TestFalconUnit extends FalconUnitTestBase { createData("in", "local", scheduleTime, "input.txt"); result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr"); assertStatus(result); - result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml")); + result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"), true); assertStatus(result); waitForStatus(EntityType.PROCESS, "process", scheduleTime); InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS, http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java index 3bd625c..bf538dc 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java +++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java @@ -70,7 +70,8 @@ public class ConfigSyncService extends AbstractEntityManager { public APIResult update(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, @Dimension("entityName") @PathParam("entity") String entityName, - @Dimension("colo") @QueryParam("colo") String colo) { - return super.update(request, type, entityName, colo); + @Dimension("colo") @QueryParam("colo") String colo, + @QueryParam("skipDryRun") Boolean skipDryRun) { + return super.update(request, type, entityName, colo, skipDryRun); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index a2af0cd..1f8cc1b 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -126,8 +126,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { public APIResult schedule(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type, @Dimension("entityName") @PathParam("entity") String entity, - @Dimension("colo") @QueryParam("colo") String colo) { - return super.schedule(request, type, entity, colo); + @Dimension("colo") @QueryParam("colo") String colo, + @QueryParam("skipDryRun") Boolean skipDryRun) { + return super.schedule(request, type, entity, colo, skipDryRun); } @POST @@ -160,8 +161,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) @Monitored(event = "validate") @Override - public APIResult validate(@Context HttpServletRequest request, @PathParam("type") String type) { - return super.validate(request, type); + public APIResult validate(@Context HttpServletRequest request, @PathParam("type") String type, + @QueryParam("skipDryRun") Boolean skipDryRun) { + return super.validate(request, type, skipDryRun); } @POST @@ -171,8 +173,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { @Override public APIResult touch(@Dimension("entityType") @PathParam("type") String type, @Dimension("entityName") @PathParam("entity") String entityName, - @Dimension("colo") @QueryParam("colo") String colo) { - return super.touch(type, entityName, colo); + @Dimension("colo") @QueryParam("colo") String colo, + @QueryParam("skipDryRun") Boolean skipDryRun) { + return super.touch(type, entityName, colo, skipDryRun); } @GET http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index e328d69..0062070 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -184,6 +184,33 @@ public class FalconCLIIT { } + public void testSkipDryRunValidCommands() throws Exception { + TestContext context = new TestContext(); + Map overlay = context.getUniqueOverlay(); + submitTestFiles(context, overlay); + + Assert.assertEquals( + executeWithURL("entity -schedule -skipDryRun -type cluster -name " + overlay.get("cluster")), -1); + + Assert.assertEquals( + executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")), 0); + + Assert.assertEquals( + executeWithURL("entity -schedule -type process -skipDryRun -name " + overlay.get("processName")), 0); + + Assert.assertEquals(0, + executeWithURL("entity -touch -skipDryRun -name " + overlay.get("processName") + " -type process")); + + String filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay); + Assert.assertEquals( + executeWithURL("entity -submitAndSchedule -skipDryRun -type feed -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + Assert.assertEquals( + executeWithURL("entity -validate -skipDryRun -type process -file " + filePath), 0); + + } + public void testSuspendResumeStatusEntityValidCommands() throws Exception { TestContext context = new TestContext(); http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index c602ffb..f0cee61 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -131,7 +131,8 @@ public class EntityManagerJerseyIT { context.assertSuccessful(response); } - private ClientResponse update(TestContext context, Entity entity, Date endTime) throws Exception { + private ClientResponse update(TestContext context, Entity entity, + Date endTime, Boolean skipDryRun) throws Exception { File tmpFile = TestContext.getTempFile(); entity.getEntityType().getMarshaller().marshal(entity, tmpFile); WebResource resource = context.service.path("api/entities/update/" @@ -139,19 +140,24 @@ public class EntityManagerJerseyIT { if (endTime != null) { resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(endTime)); } + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } return resource.header("Cookie", context.getAuthenticationToken()) .accept(MediaType.TEXT_XML) .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath())); } - private ClientResponse touch(TestContext context, Entity entity) { + private ClientResponse touch(TestContext context, Entity entity, Boolean skipDryRun) { WebResource resource = context.service.path("api/entities/touch/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName()); - ClientResponse clientResponse = resource + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } + return resource .header("Cookie", context.getAuthenticationToken()) .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) .post(ClientResponse.class); - return clientResponse; } @Test @@ -174,7 +180,7 @@ public class EntityManagerJerseyIT { //change output feed path and update feed as another user feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}"); - ClientResponse response = update(context, feed, null); + ClientResponse response = update(context, feed, null, false); context.assertSuccessful(response); bundles = OozieTestUtils.getBundles(context); @@ -222,7 +228,7 @@ public class EntityManagerJerseyIT { } public void testDryRun() throws Exception { - //Schedule of invalid process should fail because of dryRun + //Schedule of invalid process should fail because of dryRun, and should pass when dryrun is skipped TestContext context = newContext(); Map overlay = context.getUniqueOverlay(); String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); @@ -237,7 +243,7 @@ public class EntityManagerJerseyIT { ClientResponse response = context.validate(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS); context.assertFailure(response); - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false); + context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false, null); //Fix the process and then submitAndSchedule should succeed Iterator itr = process.getProperties().getProperties().iterator(); @@ -256,8 +262,13 @@ public class EntityManagerJerseyIT { //Update with invalid property should fail again process.getProperties().getProperties().add(prop); updateEndtime(process); - response = update(context, process, null); + response = update(context, process, null, null); context.assertFailure(response); + + // update where dryrun is disabled should succeed. + response = update(context, process, null, true); + context.assertSuccessful(response); + } @Test @@ -274,7 +285,7 @@ public class EntityManagerJerseyIT { process.getProperties().getProperties().get(0).setName("newprop"); Date endTime = getEndTime(); process.getClusters().getClusters().get(0).getValidity().setEnd(endTime); - response = update(context, process, endTime); + response = update(context, process, endTime, null); context.assertSuccessful(response); //Since the process endtime = update effective time, it shouldn't create new bundle @@ -315,7 +326,7 @@ public class EntityManagerJerseyIT { updateEndtime(process); Date endTime = getEndTime(); - response = update(context, process, endTime); + response = update(context, process, endTime, null); context.assertSuccessful(response); //Assert that update creates new bundle and old coord is running @@ -349,7 +360,7 @@ public class EntityManagerJerseyIT { Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); updateEndtime(process); - ClientResponse response = update(context, process, null); + ClientResponse response = update(context, process, null, null); context.assertSuccessful(response); //Assert that update does not create new bundle @@ -371,13 +382,13 @@ public class EntityManagerJerseyIT { //Update end time of process required for touch Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); updateEndtime(process); - ClientResponse response = update(context, process, null); + ClientResponse response = update(context, process, null, null); context.assertSuccessful(response); bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); //Calling force update - response = touch(context, process); + response = touch(context, process, true); context.assertSuccessful(response); OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING); @@ -871,7 +882,7 @@ public class EntityManagerJerseyIT { Date endTime = getEndTime(); ExecutorService service = Executors.newSingleThreadExecutor(); Future future = service.submit(new UpdateCommand(context, process, endTime)); - response = update(context, process, endTime); + response = update(context, process, endTime, false); ClientResponse duplicateUpdateThreadResponse = future.get(); // since there are duplicate threads for updates, there is no guarantee which request will succeed @@ -918,7 +929,7 @@ public class EntityManagerJerseyIT { @Override public ClientResponse call() throws Exception { - return update(context, process, endTime); + return update(context, process, endTime, false); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index 7b227b3..4a25b88 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -230,10 +230,11 @@ public class TestContext { } public void scheduleProcess(String processTemplate, Map overlay) throws Exception { - scheduleProcess(processTemplate, overlay, true); + scheduleProcess(processTemplate, overlay, true, null); } - public void scheduleProcess(String processTemplate, Map overlay, boolean succeed) throws Exception { + public void scheduleProcess(String processTemplate, Map overlay, + boolean succeed, Boolean skipDryRun) throws Exception { ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); assertSuccessful(response); @@ -243,7 +244,7 @@ public class TestContext { response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED); assertSuccessful(response); - response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS); + response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS, skipDryRun); if (succeed) { assertSuccessful(response); } else { @@ -278,10 +279,20 @@ public class TestContext { public ClientResponse submitAndSchedule(String template, Map overlay, EntityType entityType) throws Exception { + return submitAndSchedule(template, overlay, entityType, null); + } + + public ClientResponse submitAndSchedule(String template, Map overlay, + EntityType entityType, Boolean skipDryRun) + throws Exception { String tmpFile = overlayParametersOverTemplate(template, overlay); ServletInputStream rawlogStream = getServletInputStream(tmpFile); - return this.service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase()) + WebResource resource = service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase()); + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } + return resource .header("Cookie", getAuthenticationToken()) .accept(MediaType.TEXT_XML) .type(MediaType.TEXT_XML) @@ -290,10 +301,20 @@ public class TestContext { public ClientResponse validate(String template, Map overlay, EntityType entityType) throws Exception { + return validate(template, overlay, entityType, null); + } + + public ClientResponse validate(String template, Map overlay, + EntityType entityType, Boolean skipDryRun) + throws Exception { String tmpFile = overlayParametersOverTemplate(template, overlay); ServletInputStream rawlogStream = getServletInputStream(tmpFile); - return this.service.path("api/entities/validate/" + entityType.name().toLowerCase()) + WebResource resource = service.path("api/entities/validate/" + entityType.name().toLowerCase()); + if (null != skipDryRun) { + resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + } + return resource .header("Cookie", getAuthenticationToken()) .accept(MediaType.TEXT_XML) .type(MediaType.TEXT_XML)