Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B19C611D09 for ; Thu, 21 Aug 2014 17:31:10 +0000 (UTC) Received: (qmail 43345 invoked by uid 500); 21 Aug 2014 17:31:10 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 43310 invoked by uid 500); 21 Aug 2014 17:31:10 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 43301 invoked by uid 99); 21 Aug 2014 17:31:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2014 17:31:10 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 21 Aug 2014 17:31:04 +0000 Received: (qmail 39033 invoked by uid 99); 21 Aug 2014 17:30:44 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2014 17:30:44 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DDC859C0949; Thu, 21 Aug 2014 17:30:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arpit@apache.org To: commits@falcon.incubator.apache.org Date: Thu, 21 Aug 2014 17:30:49 -0000 Message-Id: In-Reply-To: <80dc65e0ddcc42b5bcf692abce60d0ff@git.apache.org> References: <80dc65e0ddcc42b5bcf692abce60d0ff@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/18] git commit: FALCON-583. Post processing is broken in current trunk. Contributed by Venkatesh Seetharam X-Virus-Checked: Checked by ClamAV on apache.org FALCON-583. Post processing is broken in current trunk. Contributed by Venkatesh Seetharam Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/07397774 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/07397774 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/07397774 Branch: refs/heads/FALCON-585 Commit: 0739777413ac8487a76b9983aa23ecb6cf42104d Parents: a05a288 Author: Suhas V Authored: Tue Aug 19 12:38:29 2014 +0530 Committer: Suhas V Committed: Tue Aug 19 12:38:29 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../org/apache/falcon/entity/FeedHelper.java | 5 +- .../workflow/WorkflowExecutionContext.java | 8 +- .../WorkflowJobEndNotificationService.java | 18 ++++- .../workflow/WorkflowExecutionContextTest.java | 2 +- .../falcon/messaging/JMSMessageProducer.java | 12 --- .../falcon/messaging/FeedProducerTest.java | 4 +- .../messaging/JMSMessageConsumerTest.java | 2 +- .../messaging/JMSMessageProducerTest.java | 4 +- .../falcon/messaging/ProcessProducerTest.java | 4 +- .../falcon/oozie/OozieCoordinatorBuilder.java | 79 +++++++++++++------- .../OozieOrchestrationWorkflowBuilder.java | 23 ++++-- .../feed/FeedReplicationCoordinatorBuilder.java | 14 +++- .../feed/FeedReplicationWorkflowBuilder.java | 13 +--- .../feed/FeedRetentionCoordinatorBuilder.java | 12 ++- .../feed/FeedRetentionWorkflowBuilder.java | 5 +- .../ProcessExecutionCoordinatorBuilder.java | 9 ++- .../ProcessExecutionWorkflowBuilder.java | 8 +- .../src/main/resources/action/post-process.xml | 6 +- .../feed/OozieFeedWorkflowBuilderTest.java | 69 ++++++++--------- .../falcon/oozie/process/AbstractTestBase.java | 51 +++++++++++++ .../OozieProcessWorkflowBuilderTest.java | 74 ++++++++---------- .../workflow/FalconPostProcessingTest.java | 4 +- .../falcon/rerun/handler/LateRerunHandler.java | 2 + 24 files changed, 260 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b4dfdc4..0fc3608 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -58,6 +58,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-583 Post processing is broken in current trunk + (Venkatesh Seetharam via Suhas Vasu) + FALCON-582 Latest changes to LICENSE files results in build failure (Srikanth Sundarrajan via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index d09a12f..323188d 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -20,6 +20,7 @@ package org.apache.falcon.entity; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Property; @@ -275,9 +276,9 @@ public final class FeedHelper { + storage.getTable(); } - public static Properties getUserWorkflowProperties(String policy) { + public static Properties getUserWorkflowProperties(LifeCycle lifeCycle) { Properties props = new Properties(); - props.put("userWorkflowName", policy + "-policy"); + props.put("userWorkflowName", lifeCycle.name().toLowerCase() + "-policy"); props.put("userWorkflowEngine", "falcon"); String version; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 637cc3e..786e94f 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -50,7 +50,7 @@ public class WorkflowExecutionContext { private static final Logger LOG = LoggerFactory.getLogger(WorkflowExecutionContext.class); - public static final String PROCESS_INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time + public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time public static final String OUTPUT_FEED_SEPARATOR = ","; public static final String INPUT_FEED_SEPARATOR = "#"; @@ -145,10 +145,10 @@ public class WorkflowExecutionContext { * @return a ISO8601 formatted string */ public String getNominalTimeAsISO8601() { - return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), PROCESS_INSTANCE_FORMAT); + return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), INSTANCE_FORMAT); } - public String getTimestamp() { + String getTimestamp() { return getValue(WorkflowExecutionArgs.TIMESTAMP); } @@ -157,7 +157,7 @@ public class WorkflowExecutionContext { * @return a ISO8601 formatted string */ public String getTimeStampAsISO8601() { - return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), PROCESS_INSTANCE_FORMAT); + return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), INSTANCE_FORMAT); } public String getClusterName() { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java index 39237f9..67f6c79 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java @@ -27,6 +27,8 @@ import org.apache.falcon.service.FalconService; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Date; import java.util.LinkedHashSet; @@ -37,6 +39,8 @@ import java.util.Set; */ public class WorkflowJobEndNotificationService implements FalconService { + private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class); + public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName(); private Set listeners = new LinkedHashSet(); @@ -75,7 +79,12 @@ public class WorkflowJobEndNotificationService implements FalconService { public void notifyFailure(WorkflowExecutionContext context) throws FalconException { for (WorkflowExecutionListener listener : listeners) { - listener.onFailure(context); + try { + listener.onFailure(context); + } catch (Throwable t) { + // do not rethrow as other listeners do not get a chance + LOG.error("Error in listener {}", listener.getClass().getName(), t); + } } instrumentAlert(context); @@ -83,7 +92,12 @@ public class WorkflowJobEndNotificationService implements FalconService { public void notifySuccess(WorkflowExecutionContext context) throws FalconException { for (WorkflowExecutionListener listener : listeners) { - listener.onSuccess(context); + try { + listener.onSuccess(context); + } catch (Throwable t) { + // do not rethrow as other listeners do not get a chance + LOG.error("Error in listener {}", listener.getClass().getName(), t); + } } instrumentAlert(context); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java index 93d8831..e97175e 100644 --- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java +++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java @@ -52,7 +52,7 @@ public class WorkflowExecutionContextTest { private static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory"; private static final String ISO8601_TIME = SchemaHelper.formatDateUTCToISO8601( - NOMINAL_TIME, WorkflowExecutionContext.PROCESS_INSTANCE_FORMAT); + NOMINAL_TIME, WorkflowExecutionContext.INSTANCE_FORMAT); private WorkflowExecutionContext context; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java index aeadf5c..39d6fab 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java @@ -18,7 +18,6 @@ package org.apache.falcon.messaging; -import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.conf.Configuration; @@ -213,7 +212,6 @@ public class JMSMessageProducer { } change(message, WorkflowExecutionArgs.FEED_INSTANCE_PATHS, feedPaths[i]); - convertDateFormat(message); messages.add(message); } @@ -320,16 +318,6 @@ public class JMSMessageProducer { return message; } - public void convertDateFormat(Map message) { - String date = message.get(WorkflowExecutionArgs.NOMINAL_TIME.getName()); - change(message, WorkflowExecutionArgs.NOMINAL_TIME, - SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm")); - - date = message.get(WorkflowExecutionArgs.TIMESTAMP.getName()); - change(message, WorkflowExecutionArgs.TIMESTAMP, - SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm")); - } - @SuppressWarnings("unchecked") private Connection createAndStartConnection(String implementation, String userName, String password, String url) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java ---------------------------------------------------------------------- diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java index 9119624..1c10be5 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java @@ -219,9 +219,9 @@ public class FeedProducerTest { "falcon"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), - "2011-01-01T01:00Z"); + "2011-01-01-01-00"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), - "2012-01-01T01:00Z"); + "2012-01-01-01-00"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED"); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java index 694d488..b1f8271 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java @@ -86,7 +86,7 @@ public class JMSMessageConsumerTest { } WorkflowExecutionContext context = WorkflowExecutionContext.create( - getMockFalconMessage(15), WorkflowExecutionContext.Type.POST_PROCESSING); + getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING); context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1")); MapMessage mapMessage = session.createMapMessage(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java ---------------------------------------------------------------------- diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java index 90efa3e..34cff77 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java @@ -210,9 +210,9 @@ public class JMSMessageProducerTest { Assert.assertEquals(message.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()), "agg-coord"); Assert.assertEquals(message.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), - "2011-01-01T01:00Z"); + "2011-01-01-01-00"); Assert.assertEquals(message.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), - "2012-01-01T01:00Z"); + "2012-01-01-01-00"); Assert.assertEquals(message.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED"); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java ---------------------------------------------------------------------- diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java index 95a3780..ccb47df 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java @@ -160,9 +160,9 @@ public class ProcessProducerTest { "falcon"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), - "2011-01-01T01:00Z"); + "2011-01-01-01-00"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), - "2012-01-01T01:00Z"); + "2012-01-01-01-00"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED"); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java index 459307c..fe2136b 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java @@ -19,6 +19,7 @@ package org.apache.falcon.oozie; import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; @@ -36,6 +37,7 @@ import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder; import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; import org.apache.oozie.client.OozieClient; @@ -55,11 +57,19 @@ public abstract class OozieCoordinatorBuilder extends OozieEnt protected static final String MR_JOB_PRIORITY = "jobPriority"; protected static final String IGNORE = "IGNORE"; - protected final Tag lifecycle; + protected final LifeCycle lifecycle; - public OozieCoordinatorBuilder(T entity, Tag tag) { + public OozieCoordinatorBuilder(T entity, LifeCycle lifecycle) { super(entity); - this.lifecycle = tag; + this.lifecycle = lifecycle; + } + + public LifeCycle getLifecycle() { + return lifecycle; + } + + public Tag getTag() { + return lifecycle.getTag(); } public static OozieCoordinatorBuilder get(Entity entity, Tag tag) { @@ -87,11 +97,11 @@ public abstract class OozieCoordinatorBuilder extends OozieEnt } protected Path getBuildPath(Path buildPath) { - return new Path(buildPath, lifecycle.name()); + return new Path(buildPath, getTag().name()); } protected String getEntityName() { - return EntityUtil.getWorkflowName(lifecycle, entity).toString(); + return EntityUtil.getWorkflowName(getTag(), entity).toString(); } protected Path marshal(Cluster cluster, COORDINATORAPP coord, @@ -104,26 +114,32 @@ public abstract class OozieCoordinatorBuilder extends OozieEnt String coordName) throws FalconException { Properties props = new Properties(); props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name()); + props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName()); props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL); props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL); - props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster)); - props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster)); - String falconBrokerUrl = StartupProperties.get().getProperty( - WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true"); - props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl); - String falconBrokerImplClass = StartupProperties.get().getProperty( - WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); - props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass); - String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins", - DEFAULT_BROKER_MSG_TTL.toString()); - props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL); - props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name()); - props.put("logDir", getLogDirectory(cluster)); + props.put("falconDataOperation", getOperation().name()); + + props.put(WorkflowExecutionArgs.LOG_DIR.getName(), getLogDirectory(cluster)); props.put(OozieClient.EXTERNAL_ID, new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity), "${coord:nominalTime()}").getId()); - props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster)); + props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster)); + + addLateDataProperties(props); + addBrokerProperties(cluster, props); + props.put(MR_QUEUE_NAME, "default"); + props.put(MR_JOB_PRIORITY, "NORMAL"); + + //props in entity override the set props. + props.putAll(getEntityProperties(entity)); + return props; + } + + protected abstract WorkflowExecutionContext.EntityOperations getOperation(); + + private void addLateDataProperties(Properties props) throws FalconException { if (EntityUtil.getLateProcess(entity) == null || EntityUtil.getLateProcess(entity).getLateInputs() == null || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) { @@ -131,16 +147,25 @@ public abstract class OozieCoordinatorBuilder extends OozieEnt } else { props.put("shouldRecord", "true"); } + } - props.put("entityName", entity.getName()); - props.put("entityType", entity.getEntityType().name().toLowerCase()); - props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName()); + private void addBrokerProperties(Cluster cluster, Properties props) { + props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(), + ClusterHelper.getMessageBrokerUrl(cluster)); + props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), + ClusterHelper.getMessageBrokerImplClass(cluster)); - props.put(MR_QUEUE_NAME, "default"); - props.put(MR_JOB_PRIORITY, "NORMAL"); - //props in entity override the set props. - props.putAll(getEntityProperties(entity)); - return props; + String falconBrokerUrl = StartupProperties.get().getProperty( + "broker.url", "tcp://localhost:61616?daemon=true"); + props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl); + + String falconBrokerImplClass = StartupProperties.get().getProperty( + "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); + props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass); + + String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins", + DEFAULT_BROKER_MSG_TTL.toString()); + props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL); } protected CONFIGURATION getConfig(Properties props) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index fa645a5..d232aaf 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -20,6 +20,7 @@ package org.apache.falcon.oozie; import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; @@ -79,14 +80,22 @@ public abstract class OozieOrchestrationWorkflowBuilder extend public static final Set FALCON_ACTIONS = new HashSet(Arrays.asList( new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, })); - private final Tag lifecycle; + private final LifeCycle lifecycle; - public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) { + public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) { super(entity); this.lifecycle = lifecycle; } - public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle) + public LifeCycle getLifecycle() { + return lifecycle; + } + + public Tag getTag() { + return lifecycle.getTag(); + } + + public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle) throws FalconException { switch (entity.getEntityType()) { case FEED: @@ -194,11 +203,9 @@ public abstract class OozieOrchestrationWorkflowBuilder extend } protected boolean shouldPreProcess() throws FalconException { - if (EntityUtil.getLateProcess(entity) == null || EntityUtil.getLateProcess(entity).getLateInputs() == null - || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) { - return false; - } - return true; + return !(EntityUtil.getLateProcess(entity) == null + || EntityUtil.getLateProcess(entity).getLateInputs() == null + || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0); } protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index d6115b2..f0864db 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -21,6 +21,7 @@ package org.apache.falcon.oozie.feed; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.CatalogStorage; import org.apache.falcon.entity.ClusterHelper; @@ -45,6 +46,7 @@ import org.apache.falcon.oozie.coordinator.WORKFLOW; import org.apache.falcon.oozie.coordinator.ACTION; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -74,10 +76,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder buildCoords(Cluster cluster, Path buildPath) throws FalconException { + @Override + public List buildCoords(Cluster cluster, Path buildPath) throws FalconException { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); if (feedCluster.getType() == ClusterType.TARGET) { List props = new ArrayList(); @@ -96,6 +99,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder { public FeedRetentionCoordinatorBuilder(Feed entity) { - super(entity, Tag.RETENTION); + super(entity, LifeCycle.EVICTION); } @Override public List buildCoords(Cluster cluster, Path buildPath) throws FalconException { @@ -85,15 +86,13 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder buildCoords(Cluster cluster, Path buildPath) throws FalconException { @@ -106,6 +108,11 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< return Arrays.asList(getProperties(marshalPath, coordName)); } + @Override + protected WorkflowExecutionContext.EntityOperations getOperation() { + return WorkflowExecutionContext.EntityOperations.GENERATE; + } + private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) { coord.setName(coordName); org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity, http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java index 2eae7ca..865beaf 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java @@ -19,6 +19,7 @@ package org.apache.falcon.oozie.process; import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.CatalogStorage; import org.apache.falcon.entity.ClusterHelper; @@ -58,7 +59,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, USER_ACTION_NAME, })); protected ProcessExecutionWorkflowBuilder(Process entity) { - super(entity, Tag.DEFAULT); + super(entity, LifeCycle.EXECUTION); } @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException { @@ -100,14 +101,13 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration marshal(cluster, wfApp, buildPath); Properties props = getProperties(buildPath, wfName); - props.putAll(getWorkflowProperties(cluster)); + props.putAll(getWorkflowProperties()); return props; } - private Properties getWorkflowProperties(Cluster cluster) { + private Properties getWorkflowProperties() { Properties props = new Properties(); - props.setProperty("falconDataOperation", "GENERATE"); props.setProperty("srcClusterName", "NA"); return props; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/resources/action/post-process.xml ---------------------------------------------------------------------- diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml index 1631d63..50cc0d4 100644 --- a/oozie/src/main/resources/action/post-process.xml +++ b/oozie/src/main/resources/action/post-process.xml @@ -49,15 +49,15 @@ -timeStamp ${timeStamp} -brokerImplClass - ${wf:conf("broker.impl.class")} + ${brokerImplClass} -brokerUrl - ${wf:conf("broker.url")} + ${brokerUrl} -userBrokerImplClass ${userBrokerImplClass} -userBrokerUrl ${userBrokerUrl} -brokerTTL - ${wf:conf("broker.ttlInMins")} + ${brokerTTL} -feedNames ${feedNames} -feedInstancePaths http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index 927aba3..e47895f 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -38,7 +38,6 @@ import org.apache.falcon.oozie.OozieEntityBuilder; import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.bundle.BUNDLEAPP; import org.apache.falcon.oozie.bundle.COORDINATOR; -import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.SYNCDATASET; import org.apache.falcon.oozie.process.AbstractTestBase; @@ -49,6 +48,7 @@ import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.testng.Assert; @@ -189,10 +189,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance(); Assert.assertEquals("${now(0,-40)}", outEventInstance); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); + + verifyEntityProperties(feed, trgCluster, + WorkflowExecutionContext.EntityOperations.REPLICATE, props); + verifyBrokerProperties(trgCluster, props); // verify the replication param that feed replicator depends on String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed); @@ -208,7 +209,6 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}"); Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions); Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name()); - Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, feed)); // verify the post processing params Assert.assertEquals(props.get("feedNames"), feed.getName()); @@ -274,7 +274,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z"); String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed); - assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions); + assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster, pathsWithPartitions); List betaCoords = builder.buildCoords(betaTrgCluster, new Path("/beta/falcon/")); final COORDINATORAPP betaCoord = getCoordinator(trgMiniDFS, @@ -283,7 +283,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z"); pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed); - assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions); + assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster, pathsWithPartitions); } private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster, @@ -302,9 +302,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { return parts; } - private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName, - String pathsWithPartitions) throws JAXBException, IOException { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName); + private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster, + String pathsWithPartitions) throws Exception { + org.apache.falcon.entity.v0.feed.Cluster feedCluster = + FeedHelper.getCluster(aFeed, aCluster.getName()); Date startDate = feedCluster.getValidity().getStart(); Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate)); @@ -319,10 +320,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { List args = replication.getArg(); Assert.assertEquals(args.size(), 13); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions); Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo()); @@ -331,7 +329,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name()); Assert.assertEquals(props.get("maxMaps"), "33"); Assert.assertEquals(props.get("mapBandwidthKB"), "2048"); - Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed)); + + verifyEntityProperties(aFeed, aCluster, + WorkflowExecutionContext.EntityOperations.REPLICATE, props); + verifyBrokerProperties(trgCluster, props); } public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) { @@ -415,10 +416,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml"))); Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml"))); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed); final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed); @@ -446,14 +444,18 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName()); Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}"); Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name()); - Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed)); // verify the post processing params Assert.assertEquals(props.get("feedNames"), tableFeed.getName()); Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}"); Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster)); - assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), wfPath.toString()); + assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), + wfPath.toString()); + + verifyEntityProperties(tableFeed, trgCluster, + WorkflowExecutionContext.EntityOperations.REPLICATE, props); + verifyBrokerProperties(trgCluster, props); } private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException { @@ -525,10 +527,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName()); Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}"); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); String feedDataPath = props.get("feedDataPath"); String storageType = props.get("falconFeedStorageType"); @@ -549,9 +548,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { // verify the post processing params Assert.assertEquals(props.get("feedNames"), feed.getName()); Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE"); - Assert.assertEquals(props.get("logDir"), getLogPath(srcCluster, feed)); assertWorkflowRetries(coord); + verifyEntityProperties(feed, srcCluster, + WorkflowExecutionContext.EntityOperations.DELETE, props); + verifyBrokerProperties(srcCluster, props); } @Test (dataProvider = "secureOptions") @@ -571,10 +572,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName()); Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}"); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); String feedDataPath = props.get("feedDataPath"); String storageType = props.get("falconFeedStorageType"); @@ -595,9 +593,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { // verify the post processing params Assert.assertEquals(props.get("feedNames"), tableFeed.getName()); Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE"); - Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed)); assertWorkflowRetries(coord); + verifyBrokerProperties(srcCluster, props); + verifyEntityProperties(tableFeed, trgCluster, + WorkflowExecutionContext.EntityOperations.DELETE, props); Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster)); assertHCatCredentials( @@ -632,9 +632,4 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } } } - - private String getLogPath(Cluster aCluster, Feed aFeed) { - Path logPath = EntityUtil.getLogPath(aCluster, aFeed); - return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath; - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java index a0962fc..b547c31 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java @@ -21,6 +21,7 @@ package org.apache.falcon.oozie.process; import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -29,9 +30,13 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.oozie.bundle.BUNDLEAPP; +import org.apache.falcon.oozie.coordinator.CONFIGURATION; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.testng.Assert; @@ -48,6 +53,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Collection; +import java.util.HashMap; import java.util.List; /** @@ -199,4 +205,49 @@ public class AbstractTestBase { Assert.assertEquals(action.getRetryInterval(), "1"); } } + + protected HashMap getCoordProperties(COORDINATORAPP coord) { + HashMap props = new HashMap(); + for (CONFIGURATION.Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { + props.put(prop.getName(), prop.getValue()); + } + return props; + } + + protected void verifyEntityProperties(Entity entity, Cluster cluster, + WorkflowExecutionContext.EntityOperations operation, + HashMap props) throws Exception { + Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()), + entity.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()), + entity.getEntityType().name()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity)); + Assert.assertEquals(props.get("falconDataOperation"), operation.name()); + } + + private String getLogPath(Cluster cluster, Entity entity) { + Path logPath = EntityUtil.getLogPath(cluster, entity); + return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath; + } + + protected void verifyBrokerProperties(Cluster cluster, HashMap props) { + Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_URL.getName()), + ClusterHelper.getMessageBrokerUrl(cluster)); + Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName()), + ClusterHelper.getMessageBrokerImplClass(cluster)); + + String falconBrokerUrl = StartupProperties.get().getProperty( + "broker.url", "tcp://localhost:61616?daemon=true"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_URL.getName()), falconBrokerUrl); + + String falconBrokerImplClass = StartupProperties.get().getProperty( + "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); + Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName()), + falconBrokerImplClass); + + String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins", + String.valueOf(3 * 24 * 60L)); + Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_TTL.getName()), jmsMessageTTL); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index 45a8732..45badee 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -44,7 +44,6 @@ import org.apache.falcon.oozie.OozieEntityBuilder; import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.bundle.BUNDLEAPP; import org.apache.falcon.oozie.bundle.CONFIGURATION; -import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.SYNCDATASET; import org.apache.falcon.oozie.workflow.ACTION; @@ -55,6 +54,7 @@ import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -186,19 +186,14 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { assertEquals(ds.getUriTemplate(), FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA)); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); assertEquals(props.get("mapred.job.priority"), "LOW"); - Assert.assertEquals(props.get("logDir"), getLogPath(process)); - assertLibExtensions(fs, coord, EntityType.PROCESS, null); - } + verifyEntityProperties(process, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, props); + verifyBrokerProperties(cluster, props); - private String getLogPath(Process process) { - Path logPath = EntityUtil.getLogPath(cluster, process); - return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath; + assertLibExtensions(fs, coord, EntityType.PROCESS, null); } @Test @@ -285,10 +280,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); + + verifyEntityProperties(process, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, props); + verifyBrokerProperties(cluster, props); // verify table and hive props Map expected = getExpectedProperties(inFeed, outFeed, process); @@ -298,7 +294,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(entry.getValue(), expected.get(entry.getKey())); } } - Assert.assertEquals(props.get("logDir"), getLogPath(process)); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -349,11 +344,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } - Assert.assertEquals(props.get("logDir"), getLogPath(process)); + HashMap props = getCoordProperties(coord); + + verifyEntityProperties(process, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, props); + verifyBrokerProperties(cluster, props); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -404,11 +399,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } - Assert.assertEquals(props.get("logDir"), getLogPath(process)); + HashMap props = getCoordProperties(coord); + + verifyEntityProperties(process, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, props); + verifyBrokerProperties(cluster, props); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -455,11 +450,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } - Assert.assertEquals(props.get("logDir"), getLogPath(process)); + HashMap props = getCoordProperties(coord); + verifyEntityProperties(process, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, props); + verifyBrokerProperties(cluster, props); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -546,10 +540,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } + HashMap props = getCoordProperties(coord); // verify table props Map expected = getExpectedProperties(inFeed, outFeed, process); @@ -558,7 +549,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(entry.getValue(), expected.get(entry.getKey())); } } - Assert.assertEquals(props.get("logDir"), getLogPath(process)); + verifyEntityProperties(process, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, props); + verifyBrokerProperties(cluster, props); // verify the late data params Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed()); @@ -684,11 +677,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap props = new HashMap(); - for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { - props.put(prop.getName(), prop.getValue()); - } - Assert.assertEquals(props.get("logDir"), getLogPath(processEntity)); + HashMap props = getCoordProperties(coord); + verifyEntityProperties(processEntity, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, props); + verifyBrokerProperties(cluster, props); String[] expected = { WorkflowExecutionArgs.FEED_NAMES.getName(), http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java index feddfdd..91559a5 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java @@ -150,8 +150,8 @@ public class FalconPostProcessingTest { if (workflowUser != null) { // in case of user message, its NULL Assert.assertEquals(workflowUser, "falcon"); } - Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01T01:00Z"); - Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01T01:00Z"); + Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01-01-00"); + Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01-01-00"); Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED"); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java index 76f0ac0..24c6ec2 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java @@ -37,6 +37,7 @@ import org.apache.falcon.rerun.event.LaterunEvent; import org.apache.falcon.rerun.policy.AbstractRerunPolicy; import org.apache.falcon.rerun.policy.RerunPolicyFactory; import org.apache.falcon.rerun.queue.DelayedQueue; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.conf.Configuration; @@ -65,6 +66,7 @@ public class LateRerunHandler> extends if (wait == -1) { LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName); + CurrentUser.authenticate(workflowUser); java.util.Properties properties = this.getWfEngine().getWorkflowProperties(cluster, wfId); String logDir = properties.getProperty("logDir");