falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1573 Supply user-defined properties to Oozie workflows during schedule. Contributed by Daniel Del Castillo.
Date Thu, 19 Nov 2015 14:39:32 GMT
Repository: falcon
Updated Branches:
  refs/heads/master db0604da8 -> 4576c582f


FALCON-1573 Supply user-defined properties to Oozie workflows during schedule. Contributed
by Daniel Del Castillo.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4576c582
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4576c582
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4576c582

Branch: refs/heads/master
Commit: 4576c582f21ee7872df81193f3dc1d7fcee164aa
Parents: db0604d
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Wed Nov 18 21:37:46 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Thu Nov 19 19:09:29 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 docs/src/site/twiki/FalconCLI.twiki             |  5 +-
 .../src/site/twiki/restapi/EntitySchedule.twiki | 70 +++++++++++++++++++-
 .../apache/falcon/oozie/OozieEntityBuilder.java | 23 +++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  3 +-
 .../feed/OozieFeedWorkflowBuilderTest.java      | 16 +++++
 .../OozieProcessWorkflowBuilderTest.java        | 18 +++++
 7 files changed, 134 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f46450..a95094d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1573 Supply user-defined properties to Oozie workflows during schedule(Daniel
Del Castillo via Ajay Yadava)
+
     FALCON-1559 Config changes required for native scheduler (Pallavi Rao)
 
     FALCON-1459 Ability to import from database(Venkat Ramachandran via Sowmya Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index e001a7f..26e6b33 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -44,7 +44,10 @@ Once submitted, an entity can be scheduled using schedule option. Process
and fe
 Usage:
 $FALCON_HOME/bin/falcon entity  -type [process|feed] -name <<name>> -schedule
 
-Optional Arg : -skipDryRun. When this argument is specified, Falcon skips oozie dryrun.
+Optional Arg : -skipDryRun -doAs <username>
+-properties <<key1:val1,...,keyN:valN>>
+
+<a href="./Restapi/EntitySchedule.html">Optional params described here.</a>
 
 Example:
 $FALCON_HOME/bin/falcon entity  -type process -name sampleProcess -schedule

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/docs/src/site/twiki/restapi/EntitySchedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySchedule.twiki b/docs/src/site/twiki/restapi/EntitySchedule.twiki
index 263d152..0dede9b 100644
--- a/docs/src/site/twiki/restapi/EntitySchedule.twiki
+++ b/docs/src/site/twiki/restapi/EntitySchedule.twiki
@@ -12,15 +12,81 @@ Schedule an entity.
    * :entity-name is name of the entity.
    * skipDryRun : Optional query param, Falcon skips oozie dryrun when value is set to true.
    * doAs <optional query param> allows the current user to impersonate the user passed
in doAs when interacting with the Falcon system.
+   * properties <key1:val1,...,keyN:valN> : Optional query param, supplies a set of
key-value pairs that will be available to the entity in the coordinator configuration. These
values will not override properties with the same name predefined in the entity specification.
For example, to change the scheduler used for scheduling the entity you would set the property
_falcon.scheduler_ in the properties parameter to _native_ to use the Falcon Scheduler or
to _oozie_ to use the Oozie Scheduler.
 
 
 ---++ Results
 Result of the schedule command.
 
 ---++ Examples
+---+++ Oozie Workflow
+<verbatim>
+<workflow-app xmlns="uri:oozie:workflow:0.4" name="aggregator-wf">
+  <start to="aggregator" />
+  <action name="aggregator">
+    <java>
+      <job-tracker>${jobTracker}</job-tracker>
+      <name-node>${nameNode}</name-node>
+      <configuration>
+        <property>
+          <name>mapred.job.queue.name</name>
+          <value>${queueName}</value>
+        </property>
+      </configuration>
+      <main-class>com.company.hadoop.AggregatorJob</main-class>
+      <java-opts>-Dframework.instrumentation.host=${instrumentationServer}</java-opts>
+      <arg>--input.path=${inputBasePath}</arg>
+      <arg>--output.path=${outputBasePath}</arg>
+    </java>
+    <ok to="end" />
+    <error to="fail" />
+  </action>
+  <kill name="fail">
+    <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+  </kill>
+</workflow-app>
+</verbatim>
+---+++ Submitted Process
+<verbatim>
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data.
Generates output for yesterday -->
+<process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
+    <clusters>
+      <cluster name="primary-cluster">
+        <validity start="2012-04-03T06:00Z" end="2022-12-30T00:00Z" />
+      </cluster>
+    </clusters>
+
+    <parallel>1</parallel>
+    <order>FIFO</order>
+    <frequency>hours(1)</frequency>
+
+    <inputs>
+        <input name="input" feed="SampleInput" start="yesterday(0,0)" end="today(-1,0)"
/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="SampleOutput" instance="yesterday(0,0)" />
+    </outputs>
+
+    <properties>
+        <property name="queueName" value="default" />
+        <property name="ssh.host" value="localhost" />
+        <property name="fileTimestamp" value="${coord:formatTime(coord:nominalTime(),
'yyyy-MM-dd')}" />
+        <property name="instrumentationServer" value="${coord:conf('instrumentation.host')}"
/>
+    </properties>
+
+    <workflow engine="oozie" path="/examples/apps/aggregator" />
+    <retry policy="exp-backoff" delay="minutes(5)" attempts="3" />
+    
+    <late-process policy="exp-backoff" delay="hours(1)">
+        <late-input input="input" workflow-path="/projects/bootcamp/workflow/lateinput"
/>
+    </late-process>
+</process>
+</verbatim>
 ---+++ Rest Call
 <verbatim>
-POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false&doAs=joe
+POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false&doAs=joe&properties=instrumentation.host:intrumentation.localdomain
 </verbatim>
 ---+++ Result
 <verbatim>
@@ -30,3 +96,5 @@ POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryR
     "status": "SUCCEEDED"
 }
 </verbatim>
+---+++ Notes
+In this example, the value of _framework.instrumentation.host_ in the Oozie workflow will
be _intrumentation.localdomain_ which is the property passed when the process is scheduled.

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index e0af30d..a36ee79 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -60,6 +60,7 @@ import java.io.OutputStream;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 
@@ -107,6 +108,28 @@ public abstract class OozieEntityBuilder<T extends Entity> {
 
     public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException;
 
+    public Properties build(Cluster cluster, Path buildPath, Map<String, String> properties)
throws FalconException {
+        Properties builderProperties = build(cluster, buildPath);
+        if (properties == null || properties.isEmpty()) {
+            return builderProperties;
+        }
+
+        Properties propertiesCopy = new Properties();
+        propertiesCopy.putAll(properties);
+
+        // Builder properties shadow any user-defined property
+        for(String propertyName : builderProperties.stringPropertyNames()) {
+            String propertyValue = builderProperties.getProperty(propertyName);
+            if (propertiesCopy.contains(propertyName)) {
+                LOG.warn("User provided property {} is already declared in the entity and
will be ignored.",
+                    propertyName);
+            }
+            propertiesCopy.put(propertyName, propertyValue);
+        }
+
+        return propertiesCopy;
+    }
+
     protected String getStoragePath(Path path) {
         if (path != null) {
             return getStoragePath(path.toString());

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7262964..38b6f32 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -169,11 +169,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
                 prepareEntityBuildPath(entity, cluster);
                 Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
-                Properties properties = builder.build(cluster, buildPath);
+                Properties properties = builder.build(cluster, buildPath, suppliedProps);
                 if (properties == null) {
                     LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(),
cluster);
                     continue;
                 }
+
                 //Do dryRun of coords before schedule as schedule is asynchronous
                 dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)),
skipDryRun);
                 scheduleEntity(clusterName, properties, entity);

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 176a15e..d034b1a 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -869,4 +869,20 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         property.setValue(umask);
         cluster.getProperties().getProperties().add(property);
     }
+
+    @Test
+    public void testUserDefinedProperties() throws Exception {
+        Map<String, String> suppliedProps = new HashMap<>();
+        suppliedProps.put("custom.property", "custom value");
+        suppliedProps.put("ENTITY_NAME", "MyEntity");
+
+        OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed);
+        Path bundlePath = new Path("/projects/falcon/");
+        Properties props = builder.build(trgCluster, bundlePath, suppliedProps);
+
+        Assert.assertNotNull(props);
+        Assert.assertEquals(props.get("ENTITY_NAME"), lifecycleRetentionFeed.getName());
+        Assert.assertEquals(props.get("custom.property"), "custom value");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 9f492d7..8d824ba 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -75,6 +75,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -767,4 +768,21 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
"impressions");
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()),
"NONE");
     }
+
+    @Test
+    public void testUserDefinedProperties() throws Exception {
+        Map<String, String> suppliedProps = new HashMap<>();
+        suppliedProps.put("custom.property", "custom value");
+        suppliedProps.put("ENTITY_NAME", "MyEntity");
+
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+        Path bundlePath = new Path("/projects/falcon/");
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Properties props = builder.build(cluster, bundlePath, suppliedProps);
+
+        Assert.assertNotNull(props);
+        Assert.assertEquals(props.get("ENTITY_NAME"), process.getName());
+        Assert.assertEquals(props.get("custom.property"), "custom value");
+    }
+
 }


Mime
View raw message