falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/2] git commit: FALCON-342 Add ability to set maxMaps on replication. Contributed by Michael Miklavcic
Date Fri, 14 Mar 2014 04:06:30 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 39800a097 -> bb55a2c9f


FALCON-342 Add ability to set maxMaps on replication. Contributed by Michael Miklavcic


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/1e7aa8e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/1e7aa8e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/1e7aa8e6

Branch: refs/heads/master
Commit: 1e7aa8e6a4e1d7a7c67897aedbd1bb4b4e1850a4
Parents: 39800a0
Author: Venkatesh Seetharam <venkatesh@hortonworks.com>
Authored: Thu Mar 13 21:03:46 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@hortonworks.com>
Committed: Thu Mar 13 21:03:46 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                             |  3 +++
 common/src/main/resources/runtime.properties            |  4 +++-
 .../org/apache/falcon/converter/OozieFeedMapper.java    | 10 ++++++++++
 .../resources/config/workflow/replication-workflow.xml  |  2 +-
 .../apache/falcon/converter/OozieFeedMapperTest.java    | 12 +++++++++---
 feed/src/test/resources/fs-replication-feed.xml         |  3 +++
 .../apache/falcon/replication/FeedReplicatorTest.java   |  2 +-
 7 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e7aa8e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 23b4354..f58750e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -71,6 +71,9 @@ Trunk (Unreleased)
 
     FALCON-334 Add indexing to the graph property keys. (Venkatesh Seetharam)
 
+    FALCON-342 Add ability to set maxMaps on replication (Michael Miklavcic
+    via Venkatesh Seetharam)
+
   OPTIMIZATIONS
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e7aa8e6/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 87b9d1e..22b1050 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -24,4 +24,6 @@
 *.log.cleanup.frequency.months.retention =months(3)
 
 *.falcon.parentworkflow.retry.max=3
-*.falcon.parentworkflow.retry.interval.secs=1
\ No newline at end of file
+*.falcon.parentworkflow.retry.interval.secs=1
+
+*.falcon.replication.workflow.maxmaps=5
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e7aa8e6/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index e589c02..2b3315f 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -47,6 +47,7 @@ import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -215,6 +216,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed>
{
     }
 
     private class ReplicationOozieWorkflowMapper {
+        private static final String MR_MAX_MAPS = "maxMaps";
+
         private static final int THIRTY_MINUTES = 30 * 60 * 1000;
 
         private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
@@ -415,6 +418,9 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed>
{
                 Map<String, String> props = createCoordDefaultConfiguration(trgCluster,
wfPath, wfName);
                 props.put("srcClusterName", srcCluster.getName());
                 props.put("srcClusterColo", srcCluster.getColo());
+                if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+                    props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+                }
 
                 // the storage type is uniform across source and target feeds for replication
                 props.put("falconFeedStorageType", sourceStorage.getType().name());
@@ -450,6 +456,10 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed>
{
             return replicationAction;
         }
 
+        private String getDefaultMaxMaps() {
+            return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps",
"5");
+        }
+
         private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
                                               Feed feed) throws FalconException {
             String srcPart = FeedHelper.normalizePartitionExpression(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e7aa8e6/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index db1c0e5..34ef68b 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -118,7 +118,7 @@
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
             <arg>-maxMaps</arg>
-            <arg>5</arg>
+            <arg>${maxMaps}</arg>
             <arg>-sourcePaths</arg>
             <arg>${distcpSourcePaths}</arg>
             <arg>-targetPath</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e7aa8e6/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 671c53c..e610df2 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -205,9 +205,14 @@ public class OozieFeedMapperTest {
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
 
         // verify workflow params
-        Assert.assertEquals("replication-policy", props.get("userWorkflowName"));
-        Assert.assertEquals("0.5", props.get("userWorkflowVersion"));
-        Assert.assertEquals("falcon", props.get("userWorkflowEngine"));
+        Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
+        Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
+        Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
+
+        // verify default params
+        Assert.assertEquals(props.get("queueName"), "default");
+        Assert.assertEquals(props.get("jobPriority"), "NORMAL");
+        Assert.assertEquals(props.get("maxMaps"), "5");
 
         assertLibExtensions(coord, "replication");
         assertWorkflowRetries(coord);
@@ -331,6 +336,7 @@ public class OozieFeedMapperTest {
         Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
         Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+        Assert.assertEquals(props.get("maxMaps"), "33");
     }
 
     public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e7aa8e6/feed/src/test/resources/fs-replication-feed.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/fs-replication-feed.xml b/feed/src/test/resources/fs-replication-feed.xml
index c4ee460..e0a448f 100644
--- a/feed/src/test/resources/fs-replication-feed.xml
+++ b/feed/src/test/resources/fs-replication-feed.xml
@@ -61,4 +61,7 @@
 
     <ACL permission="0x755" group="group" owner="fetl"/>
     <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+    <properties>
+        <property name="maxMaps" value="33" />
+    </properties>
 </feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e7aa8e6/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index b8b39ad..2e3bf57 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -42,7 +42,7 @@ public class FeedReplicatorTest {
          */
         final String[] args = {
             "true",
-            "-maxMaps", "5",
+            "-maxMaps", "3",
             "-sourcePaths", "hdfs://localhost:8020/tmp/",
             "-targetPath", "hdfs://localhost1:8020/tmp/",
             "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),


Mime
View raw message