falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject git commit: FALCON-47 Falcon Replication should support configurable delays in feed, parallel, timeout and bulk transfer with variable frequency. Contributed by Shaik Idris Ali
Date Sat, 20 Jul 2013 14:32:46 GMT
Updated Branches:
  refs/heads/master 8d2d34b9d -> 689d52a6e


FALCON-47 Falcon Replication should support configurable delays in feed, parallel, timeout
and bulk transfer with variable frequency. Contributed by Shaik Idris Ali


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/689d52a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/689d52a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/689d52a6

Branch: refs/heads/master
Commit: 689d52a6e98055b98e13bdda02c4ebc820e13257
Parents: 8d2d34b
Author: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Authored: Sat Jul 20 19:47:33 2013 +0530
Committer: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Committed: Sat Jul 20 19:47:33 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++
 .../org/apache/falcon/entity/v0/Frequency.java  |  2 +-
 client/src/main/resources/feed-0.1.xsd          |  1 +
 docs/src/site/twiki/EntitySpecification.twiki   |  6 ++-
 docs/src/site/twiki/FalconDocumentation.twiki   |  6 ++-
 .../falcon/converter/OozieFeedMapper.java       | 41 +++++++++++++++++++-
 .../falcon/converter/OozieFeedMapperTest.java   | 13 ++++++-
 feed/src/test/resources/feed.xml                |  2 +-
 pom.xml                                         |  4 ++
 9 files changed, 72 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2762ddf..82cf4ae 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,10 @@ Trunk (Unreleased)
 
   IMPROVEMENTS
 
+    FALCON-47 Falcon Replication should support configurable delays in feed, 
+    parallel, timeout and bulk transfer with variable frequency (Shaik Idris
+    Ali via Srikanth Sundarrajan)
+
     FALCON-49 Dependency on oozie-client pulls in hadoop-auth-0.23.1 which 
     is not necessary (Venkatesh Seetharam via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/client/src/main/java/org/apache/falcon/entity/v0/Frequency.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/Frequency.java b/client/src/main/java/org/apache/falcon/entity/v0/Frequency.java
index 911073a..f4c1800 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/Frequency.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/Frequency.java
@@ -68,7 +68,7 @@ public class Frequency {
     }
 
     public static String toString(Frequency freq) {
-        return freq.toString();
+        return freq==null? null:freq.toString();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 8390565..bf6fa81 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -144,6 +144,7 @@
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="cluster-type" name="type" use="optional"/>
         <xs:attribute type="xs:string" name="partition" use="optional"/>
+        <xs:attribute type="frequency-type" name="delay" use="optional" /> 
     </xs:complexType>
     <xs:complexType name="partitions">
         <xs:annotation>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 2ddbd95..bae2612 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -151,9 +151,11 @@ paths are available as a property in a process.
         <property name="field2" value="value2" />
         <property name="queueName" value="hadoopQueue"/>
         <property name="jobPriority" value="VERY_HIGH"/>
+        <property name="timeout" value="hours(1)"/>
+        <property name="parallel" value="3"/>
     </properties>
 </verbatim>
-A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority"
are special properties available to user to specify the hadoop job queue and priority, the
same value is used by Falcons launcher job.
+A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority"
are special properties available to user to specify the hadoop job queue and priority, the
same value is used by Falcons launcher job. "timeout" and "parallel" are other special properties
which decides replication instance's timeout value while waiting for the feed instance and
parallel decides the concurrent replication instances that can run at any given time.
  
 ---++ Process Specification
 A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG)
which defines the job for the workflow engine. A process definition defines  the configurations
required to run the workflow job. For example, process defines the frequency at which the
workflow should run, the clusters on which the workflow should run, the inputs and outputs
for the workflow, how the workflow failures should be handled, how the late inputs should
be handled and so on.  
@@ -478,4 +480,4 @@ Example:
 ...
 </process>
 </verbatim>
-This late handling specifies that late data detection should run at feed's late cut-off which
is 6 hours in this case. If there is late data, Falcon should run the workflow specified at
/projects/bootcamp/workflow/lateinput1/workflow.xml
\ No newline at end of file
+This late handling specifies that late data detection should run at feed's late cut-off which
is 6 hours in this case. If there is late data, Falcon should run the workflow specified at
/projects/bootcamp/workflow/lateinput1/workflow.xml

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/docs/src/site/twiki/FalconDocumentation.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconDocumentation.twiki b/docs/src/site/twiki/FalconDocumentation.twiki
index 6312627..8861c8f 100644
--- a/docs/src/site/twiki/FalconDocumentation.twiki
+++ b/docs/src/site/twiki/FalconDocumentation.twiki
@@ -215,7 +215,7 @@ The frequency at which the data is replicated is governed by the frequency
speci
 Ideally, the feeds data path should have the same granularity as that for frequency of the
feed, i.e. if the frequency of the feed is hours(3), then the data path should be to level
/${YEAR}/${MONTH}/${DAY}/${HOUR}. 
 <verbatim>
     <clusters>
-        <cluster name="sourceCluster1" type="source" partition="${cluster.name}">
+        <cluster name="sourceCluster1" type="source" partition="${cluster.name}" delay="minutes(40)">
             <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
         </cluster>
         <cluster name="sourceCluster2" type="source" partition="COUNTRY/${cluster.name}">
@@ -235,6 +235,10 @@ Also, for every feed instance which is replicated Falcon sends a JMS
message on
 
 Replication can be scheduled with the past date, the time frame considered for replication
is the minimum overlapping window of start and end time of source and target cluster, ex:
if s1 and e1 is the start and end time of source cluster respectively,
 and s2 and e2 of target cluster, then the coordinator is scheduled in target cluster with
start time max(s1,s2) and min(e1,e2).
+
+A feed can also optionally specify the delay for replication instance in the cluster tag,
the delay governs the replication instance delays. If the frequency of the feed is hours(2)
and delay is hours(1), then the replication instance will run every 2 hours and replicates
data with an offset of 1 hour, i.e at
+09:00 UTC, feed instance which is eligible for replication is 08:00; and 11:00 UTC, feed
instance of 10:00 UTC is eligible and so on.
+
 ---+++ Where is the feed path defined?
 
 It's defined in the feed xml within the location tag.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index d954202..adef3ec 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -62,6 +63,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed>
{
     private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
 
     private static final String FEED_PATH_SEP = "#";
+    private static final String TIMEOUT = "timeout";
+    private static final String PARALLEL = "parallel";
 
     public OozieFeedMapper(Feed feed) {
         super(feed);
@@ -198,13 +201,49 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed>
{
             if (timeoutInMillis < THIRTY_MINUTES) {
                 timeoutInMillis = THIRTY_MINUTES;
             }
+            Map<String, String> props = getEntityProperties();
+            String timeout = props.get(TIMEOUT);
+            if (timeout!=null) {
+                try{
+                    timeoutInMillis= ExpressionHelper.get().
+                            evaluate(timeout, Long.class);
+                } catch (Exception ignore) {
+                    LOG.error("Unable to evaluate timeout:", ignore);
+                }
+            }
+            String parallelProp = props.get(PARALLEL);
+            int parallel = 1;
+            if (parallelProp != null) {
+                try {
+                    parallel = Integer.parseInt(parallelProp);
+                } catch (NumberFormatException ignore) {
+                    LOG.error("Unable to parse parallel:", ignore);
+                }
+            }
+
             replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000
* 60)));
             replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis
* 2));
-
+            replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
+
+            Frequency replicationDelay = FeedHelper.getCluster(feed,
+                    srcCluster.getName()).getDelay();
+            long delayInMillis=0;
+            if (replicationDelay != null) {
+                delayInMillis = ExpressionHelper.get().evaluate(
+                        replicationDelay.toString(), Long.class);
+                long delayInMins = -1 * delayInMillis / (1000 * 60);
+                String elExp = "${now(0," + delayInMins + ")}";
+                replicationCoord.getInputEvents().getDataIn().get(0)
+                .getInstance().set(0, elExp);
+                replicationCoord.getOutputEvents().getDataOut().get(0)
+                .setInstance(elExp);
+            }
             Date srcStartDate = FeedHelper.getCluster(feed, srcCluster.getName()).getValidity().getStart();
+            srcStartDate=new Date(srcStartDate.getTime()+delayInMillis);
             Date srcEndDate = FeedHelper.getCluster(feed, srcCluster.getName()).getValidity().getEnd();
             Date trgStartDate = FeedHelper.getCluster(feed, trgCluster.getName()).getValidity().getStart();
             Date trgEndDate = FeedHelper.getCluster(feed, trgCluster.getName()).getValidity().getEnd();
+            trgStartDate=new Date(trgStartDate.getTime()+delayInMillis);
             if (srcStartDate.after(trgEndDate)
                     || trgStartDate.after(srcEndDate)) {
                 LOG.warn("Not creating replication coordinator, as the source cluster:"

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index d3d8e91..fb45c68 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -106,6 +106,7 @@ public class OozieFeedMapperTest {
                 new Path("/projects/falcon/"));
         COORDINATORAPP coord = coords.get(0);
 
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
         Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
                 .getAction().getWorkflow().getAppPath());
         Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
@@ -129,7 +130,17 @@ public class OozieFeedMapperTest {
         Assert.assertEquals(
                 "${nameNode}"
                         + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
-                outputDataset.getUriTemplate());
+                        outputDataset.getUriTemplate());
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty())
{
             if (prop.getName().equals("mapred.job.priority")) {
                 assertEquals(prop.getValue(), "NORMAL");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/feed/src/test/resources/feed.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/feed.xml b/feed/src/test/resources/feed.xml
index d5948b0..b003016 100644
--- a/feed/src/test/resources/feed.xml
+++ b/feed/src/test/resources/feed.xml
@@ -25,7 +25,7 @@
 
     <late-arrival cut-off="minutes(3)"/>
     <clusters>
-        <cluster name="corp1" type="source">
+        <cluster name="corp1" type="source" delay="minutes(40)">
             <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
             <retention limit="minutes(5)" action="delete"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/689d52a6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b75cf8a..9121dd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -819,6 +819,10 @@
                         <exclude>*.patch</exclude>
                         <exclude>derby.log</exclude>
                         <exclude>**/logs/**</exclude>
+                        <exclude>**/.classpath</exclude>
+                        <exclude>**/.project</exclude>
+                        <exclude>**/.settings/**</exclude>
+                        <exclude>**/test-output/**</exclude>
                     </excludes>
                 </configuration>
                 <executions>


Mime
View raw message