Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5F309200B68 for ; Fri, 5 Aug 2016 06:25:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5C077160AAE; Fri, 5 Aug 2016 04:25:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ABBAC160AAB for ; Fri, 5 Aug 2016 06:25:56 +0200 (CEST) Received: (qmail 59748 invoked by uid 500); 5 Aug 2016 04:25:55 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 59739 invoked by uid 99); 5 Aug 2016 04:25:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Aug 2016 04:25:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 28F76E0A7D; Fri, 5 Aug 2016 04:25:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pallavi@apache.org To: commits@falcon.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-2039 Move falcon post processing to falcon server and remove post processing Date: Fri, 5 Aug 2016 04:25:55 +0000 (UTC) archived-at: Fri, 05 Aug 2016 04:25:58 -0000 Repository: falcon Updated Branches: refs/heads/master 723f1f7f1 -> 7d9687bcb FALCON-2039 Move falcon post processing to falcon server and remove post processing Author: Praveen Adlakha Reviewers: @pallavi-rao, @vrangan Closes #244 from PraveenAdlakha/2039 and squashes the following commits: b71290a [Praveen Adlakha] process removed 26ddc02 [Praveen Adlakha] comments addressed d4f4cf8 [Praveen Adlakha] fixed test cases cdf9ae1 [Praveen Adlakha] documentation added 9aee018 [Praveen Adlakha] multithread added b3115de [Praveen Adlakha] Agebased boolean check removed 22c25a3 [Praveen Adlakha] Startup properties 1bc278f [Praveen Adlakha] minor fixes done 71b8d1a [Praveen Adlakha] check added in service dd58642 [Praveen Adlakha] FALCON-2039 Move falcon post processing to falcon server and remove post processing action from falcon workflow Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7d9687bc Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7d9687bc Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7d9687bc Branch: refs/heads/master Commit: 7d9687bcbc8a978ad361498084f3e473fd69fc9b Parents: 723f1f7 Author: Praveen Adlakha Authored: Fri Aug 5 09:55:49 2016 +0530 Committer: Pallavi Rao Committed: Fri Aug 5 09:55:49 2016 +0530 ---------------------------------------------------------------------- common/src/main/resources/runtime.properties | 2 +- common/src/main/resources/startup.properties | 7 ++ docs/src/site/twiki/Configuration.twiki | 14 +++ .../retention/AgeBasedWorkflowBuilder.java | 32 ++++-- .../engine/oozie/utils/OozieBuilderUtils.java | 2 + oozie/pom.xml | 1 - .../org/apache/falcon/logging/JobLogMover.java | 14 +++ .../oozie/DatabaseExportWorkflowBuilder.java | 13 +-- .../oozie/DatabaseImportWorkflowBuilder.java | 13 +-- .../OozieOrchestrationWorkflowBuilder.java | 30 +++++ .../feed/FSReplicationWorkflowBuilder.java | 15 +-- .../feed/FeedRetentionWorkflowBuilder.java | 13 +-- .../feed/HCatReplicationWorkflowBuilder.java | 15 +-- .../ProcessExecutionWorkflowBuilder.java | 12 +- .../apache/falcon/service/LogMoverService.java | 111 +++++++++++++++++++ .../falcon/workflow/FalconPostProcessing.java | 18 +-- .../feed/OozieFeedWorkflowBuilderTest.java | 48 ++++++++ .../OozieProcessWorkflowBuilderTest.java | 34 ++++++ src/conf/startup.properties | 7 ++ 19 files changed, 295 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/common/src/main/resources/runtime.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties index 643559e..ba4c055 100644 --- a/common/src/main/resources/runtime.properties +++ b/common/src/main/resources/runtime.properties @@ -51,4 +51,4 @@ *.falcon.service.ProxyUserService.proxyuser.#USER#.groups=* -######### Proxyuser Configuration End ######### \ No newline at end of file +######### Proxyuser Configuration End ######### http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 4b692a2..9207b25 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -83,6 +83,7 @@ ##### Workflow Job Execution Completion listeners ##### *.workflow.execution.listeners= +#org.apache.falcon.service.LogMoverService ######### Implementation classes ######### @@ -336,3 +337,9 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # Backlog Metric Properties #*.falcon.backlog.metricservice.emit.interval.millisecs=60000 #*.falcon.backlog.metricservice.recheck.interval.millisecs=600000 + +# Property to remove postProcessing +*.falcon.postprocessing.enable=true + +### LogMoveService Thread count +*.falcon.logMoveService.threadCount=50 http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/docs/src/site/twiki/Configuration.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki index ce32019..c686d48 100644 --- a/docs/src/site/twiki/Configuration.twiki +++ b/docs/src/site/twiki/Configuration.twiki @@ -317,6 +317,20 @@ su - $OOZIE_USER Where $OOZIE_USER is the Oozie user. For example, oozie. +---+++Disabling Falcon Post Processing +Falcon post processing performs two tasks: +They send user notifications to Active mq. +It moves oozie executor logs once the workflow finishes. + +If post processing is failing because of any reason user mind end up having a backlog in the pipeline thats why it has been made optional. + +To disable post processing set the following property to false in startup.properties : + +*.falcon.postprocessing.enable=false +*.workflow.execution.listeners=org.apache.falcon.service.LogMoverService + +*NOTE : Please make sure Oozie JMS Notifications are enabled as logMoverService depends on the Oozie JMS Notification.* + ---+++Enabling Falcon Native Scheudler You can either choose to schedule entities using Oozie's coordinator or using Falcon's native scheduler. To be able to http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java index 34b186e..dd0c6d2 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java @@ -59,19 +59,27 @@ public final class AgeBasedWorkflowBuilder { //Add eviction action ACTION eviction = OozieBuilderUtils.unmarshalAction(EVICTION_ACTION_TEMPLATE); - OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME, - OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(eviction); - - //Add post-processing actions - ACTION success = OozieBuilderUtils.getSuccessPostProcessAction(); - OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(success); - - ACTION fail = OozieBuilderUtils.getFailPostProcessAction(); - OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(fail); + if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){ + OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.OK_ACTION_NAME, + OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(eviction); + } else { + OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME, + OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(eviction); + + //Add post-processing actions + ACTION success = OozieBuilderUtils.getSuccessPostProcessAction(); + OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, + OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(success); + + ACTION fail = OozieBuilderUtils.getFailPostProcessAction(); + OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, + OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(fail); + } OozieBuilderUtils.decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME); OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION, EntityType.FEED); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java index 8f1b53b..7d51c9a 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java @@ -102,6 +102,8 @@ public final class OozieBuilderUtils { public static final String ENTITY_PATH = "ENTITY_PATH"; public static final String ENTITY_NAME = "ENTITY_NAME"; public static final String IGNORE = "IGNORE"; + public static final String ENABLE_POSTPROCESSING = StartupProperties.get(). + getProperty("falcon.postprocessing.enable"); static { http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/oozie/pom.xml b/oozie/pom.xml index 7bfb086..2adbba3 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -105,7 +105,6 @@ org.apache.commons commons-lang3 - http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 64596c6..6ec2a20 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; @@ -68,6 +69,19 @@ public class JobLogMover { return conf == null ? new Configuration(): conf; } + public void moveLog(WorkflowExecutionContext context){ + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Unable to move logs as security is enabled."); + return; + } + try { + run(context); + } catch (Exception ignored) { + // Mask exception, a failed log mover will not fail the user workflow + LOG.error("Exception in job log mover:", ignored); + } + } + public int run(WorkflowExecutionContext context) { try { OozieClient client = new OozieClient(context.getWorkflowEngineUrl()); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java index 6468415..b86afaf 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java @@ -67,18 +67,7 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder { ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopExport); OozieUtils.marshalSqoopAction(action, actionJaxbElement); - addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(action); - - //Add post-processing actions - ACTION success = getSuccessPostProcessAction(); - addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(success); - - ACTION fail = getFailPostProcessAction(); - addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(fail); - + addPostProcessing(workflow, action); decorateWorkflow(workflow, workflow.getName(), EXPORT_ACTION_NAME); addLibExtensionsToWorkflow(cluster, workflow, Tag.EXPORT); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java index 44562f2..5c95162 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java @@ -69,18 +69,7 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopImport); OozieUtils.marshalSqoopAction(action, actionJaxbElement); - addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(action); - - //Add post-processing actions - ACTION success = getSuccessPostProcessAction(); - addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(success); - - ACTION fail = getFailPostProcessAction(); - addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(fail); - + addPostProcessing(workflow, action); decorateWorkflow(workflow, workflow.getName(), IMPORT_ACTION_NAME); addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index 5ad3d03..9683e62 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -80,6 +80,13 @@ import java.util.Set; public abstract class OozieOrchestrationWorkflowBuilder extends OozieEntityBuilder { public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth"; + public String getEnablePostProcessing() { + return enablePostprocessing; + } + + private String enablePostprocessing = StartupProperties.get(). + getProperty("falcon.postprocessing.enable"); + protected static final String USER_ACTION_NAME = "user-action"; protected static final String PREPROCESS_ACTION_NAME = "pre-processing"; protected static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing"; @@ -130,6 +137,10 @@ public abstract class OozieOrchestrationWorkflowBuilder extend return get(entity, cluster, lifecycle, Scheduler.OOZIE); } + public Boolean isPostProcessingEnabled(){ + return Boolean.parseBoolean(getEnablePostProcessing()); + } + public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle, Scheduler scheduler) throws FalconException { @@ -219,6 +230,25 @@ public abstract class OozieOrchestrationWorkflowBuilder extend wf.getDecisionOrForkOrJoin().add(kill); } + protected void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{ + if (!isPostProcessingEnabled()){ + addTransition(action, OK_ACTION_NAME, FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(action); + }else{ + addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(action); + + //Add post-processing actions + ACTION success = getSuccessPostProcessAction(); + addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(success); + + ACTION fail = getFailPostProcessAction(); + addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(fail); + } + } + protected ACTION getSuccessPostProcessAction() throws FalconException { ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE); decorateWithOozieRetries(action); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java index cfcc698..598cf6f 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java @@ -59,20 +59,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder addAdditionalReplicationProperties(replication); enableCounters(replication); enableTDE(replication); - addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(replication); - - //Add post-processing actions - ACTION success = getSuccessPostProcessAction(); - addHDFSServersConfig(success, src, target); - addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(success); - - ACTION fail = getFailPostProcessAction(); - addHDFSServersConfig(fail, src, target); - addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(fail); - + addPostProcessing(workflow, replication); decorateWorkflow(workflow, wfName, start); return workflow; } http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java index b9e3848..fd51ed0 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java @@ -51,21 +51,10 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException { WORKFLOWAPP workflow = new WORKFLOWAPP(); String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(); - //Add eviction action ACTION eviction = unmarshalAction(EVICTION_ACTION_TEMPLATE); - addTransition(eviction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(eviction); - - //Add post-processing actions - ACTION success = getSuccessPostProcessAction(); - addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(success); - - ACTION fail = getFailPostProcessAction(); - addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(fail); + addPostProcessing(workflow, eviction); decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME); addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java index 3da97d3..f4eecb7 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java @@ -127,20 +127,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild //Add cleanup action ACTION cleanup = unmarshalAction(CLEANUP_ACTION_TEMPLATE); - addTransition(cleanup, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(cleanup); - - //Add post-processing actions - ACTION success = getSuccessPostProcessAction(); - addHDFSServersConfig(success, src, target); - addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(success); - - ACTION fail = getFailPostProcessAction(); - addHDFSServersConfig(fail, src, target); - addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(fail); - + addPostProcessing(workflow, cleanup); decorateWorkflow(workflow, wfName, start); setupHiveCredentials(src, target, workflow); return workflow; http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java index 5d2c43e..c31b4ee 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java @@ -83,20 +83,10 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration wfApp.getDecisionOrForkOrJoin().add(preProcessAction); startAction = PREPROCESS_ACTION_NAME; } - //Add user action ACTION userAction = getUserAction(cluster, buildPath); - addTransition(userAction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - wfApp.getDecisionOrForkOrJoin().add(userAction); - - //Add post-processing - ACTION success = getSuccessPostProcessAction(); - addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); - wfApp.getDecisionOrForkOrJoin().add(success); - ACTION fail = getFailPostProcessAction(); - addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); - wfApp.getDecisionOrForkOrJoin().add(fail); + addPostProcessing(wfApp, userAction); decorateWorkflow(wfApp, wfName, startAction); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java new file mode 100644 index 0000000..1f3d0a0 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.logging.JobLogMover; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.falcon.workflow.WorkflowExecutionListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + + +/** + * Moves Falcon logs. + */ +public class LogMoverService implements WorkflowExecutionListener { + + private static final Logger LOG = LoggerFactory.getLogger(LogMoverService.class); + + public static final String ENABLE_POSTPROCESSING = StartupProperties.get(). + getProperty("falcon.postprocessing.enable"); + + private BlockingQueue blockingQueue = new ArrayBlockingQueue<>(50); + private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120, + TimeUnit.SECONDS, blockingQueue); + public int getThreadCount() { + try{ + return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount")); + } catch (NumberFormatException e){ + LOG.error("Exception in LogMoverService", e); + return 50; + } + } + + @Override + public void onSuccess(WorkflowExecutionContext context) throws FalconException{ + onEnd(context); + } + + @Override + public void onFailure(WorkflowExecutionContext context) throws FalconException{ + onEnd(context); + } + + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException{ + //Do Nothing + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException{ + //DO Nothing + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException{ + //DO Nothing + } + + private void onEnd(WorkflowExecutionContext context){ + if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) { + return; + } + while(0 coords = bundle.getCoordinator(); + COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath()); + + WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord); + + Boolean foundUserAction = false; + Boolean foundPostProcessing = false; + Iterator coordIterator = coords.iterator(); + + while(coordIterator.hasNext()){ + COORDINATORAPP coord1 = getCoordinator(trgMiniDFS, coordIterator.next().getAppPath()); + WORKFLOWAPP workflow1 = getWorkflowapp(trgMiniDFS.getFileSystem(), coord1); + Iterator workflowIterator = workflow1.getDecisionOrForkOrJoin().iterator(); + while (workflowIterator.hasNext()){ + Object object = workflowIterator.next(); + if (ACTION.class.isAssignableFrom(object.getClass())){ + ACTION action = (ACTION) object; + if (action.getName().equals("eviction") || action.getName().equals("replication")){ + foundUserAction = true; + } + if (action.getName().contains("post")){ + foundPostProcessing = true; + } + } + } + } + + assertTrue(foundUserAction); + assertFalse(foundPostProcessing); + StartupProperties.get().setProperty("falcon.postprocessing.enable", "true"); + } + + @Test public void testReplicationCoordsForFSStorage() throws Exception { OozieEntityBuilder builder = OozieEntityBuilder.get(feed); Path bundlePath = new Path("/projects/falcon/"); @@ -697,6 +740,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Configuration conf = fs.getConf(); conf.set("fs.permissions.umask-mode", umask); + OozieEntityBuilder feedBuilder = OozieEntityBuilder.get(feed); + Path bundlePath = new Path("/projects/falcon/"); + feedBuilder.build(trgCluster, bundlePath); + // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf setUmaskInFsConf(srcCluster, umask); @@ -759,6 +806,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { @Test (dataProvider = "secureOptions") public void testRetentionCoordsForTable(String secureOption) throws Exception { + StartupProperties.get().setProperty("falcon.postprocessing.enable", "true"); StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption); final String umask = "000"; http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index a692d0c..05b513e 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -78,6 +78,7 @@ import java.util.Map; import java.util.Properties; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; /** @@ -115,6 +116,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { storeEntity(EntityType.PROCESS, "clicksummary", PROCESS_XML); storeEntity(EntityType.PROCESS, "pig-process", PIG_PROCESS_XML); + ConfigurationStore store = ConfigurationStore.get(); cluster = store.get(EntityType.CLUSTER, "corp"); org.apache.falcon.entity.v0.cluster.Property property = @@ -785,6 +787,38 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { assertAction(parentWorkflow, "user-action", false); } + @Test + public void testPostProcessingProcess() throws Exception { + StartupProperties.get().setProperty("falcon.postprocessing.enable", "false"); + Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process"); + + OozieEntityBuilder builder = OozieEntityBuilder.get(process); + Path bundlePath = new Path("/falcon/staging/workflows", process.getName()); + builder.build(cluster, bundlePath); + BUNDLEAPP bundle = getBundle(fs, bundlePath); + String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); + COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); + + String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); + WORKFLOWAPP workflowapp = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); + + Boolean foudUserAction = false; + Boolean foundpostProcessing =false; + + for(Object action : workflowapp.getDecisionOrForkOrJoin()){ + if (action instanceof ACTION && ((ACTION)action).getName().equals("user-action")){ + foudUserAction = true; + } + if (action instanceof ACTION && ((ACTION)action).getName().contains("post")){ + foundpostProcessing = true; + } + + } + assertTrue(foudUserAction); + assertFalse(foundpostProcessing); + StartupProperties.get().setProperty("falcon.postprocessing.enable", "true"); + } + @AfterMethod public void cleanup() throws Exception { cleanupStore(); http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index ef07e57..a47327a 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -123,6 +123,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ ##### Workflow Job Execution Completion listeners ##### *.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler +#org.apache.falcon.service.LogMoverService ######### Implementation classes ######### @@ -353,3 +354,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Backlog Metric Properties #*.falcon.backlog.metricservice.emit.interval.millisecs=60000 #*.falcon.backlog.metricservice.recheck.interval.millisecs=600000 + +# Property to remove postProcessing +*.falcon.postprocessing.enable=true + +### LogMoveService Thread count +*.falcon.logMoveService.threadCount=50