falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [1/2] git commit: FALCON-631 add late data regression test for feed and process contributed by Pragya M
Date Thu, 09 Oct 2014 07:25:30 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 396db7825 -> 90faa0eab


FALCON-631 add late data regression test for feed and process contributed by Pragya M


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c8ecb10c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c8ecb10c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c8ecb10c

Branch: refs/heads/master
Commit: c8ecb10c7aab66c6d77a20cf09b3cc8c18f7d5a7
Parents: 5c30543
Author: Samarth Gupta <samarth.gupta@inmobi.com>
Authored: Thu Oct 9 12:53:31 2014 +0530
Committer: Samarth Gupta <samarth.gupta@inmobi.com>
Committed: Thu Oct 9 12:53:31 2014 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../falcon/regression/core/util/BundleUtil.java |   8 +-
 .../falcon/regression/core/util/OozieUtil.java  |  59 ++-
 .../falcon/regression/FeedLateRerunTest.java    | 344 ++++++++++++++++++
 .../falcon/regression/FeedReplicationTest.java  |   2 +-
 .../falcon/regression/ProcessLateRerunTest.java | 356 +++++++++++++++++++
 .../PrismFeedReplicationPartitionExpTest.java   |   2 +-
 .../prism/UpdateAtSpecificTimeTest.java         |   2 +-
 .../FeedReplicaltionBundles/InputFeed.xml       |  47 +++
 .../FeedReplicaltionBundles/cluster-0.1.xml     |  38 ++
 .../FETL-BillingRC.xml                          |  48 ---
 .../cluster-0.1.xml                             |  38 --
 12 files changed, 844 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 9c9a147..8281765 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -12,6 +12,9 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+
+   FALCON-631 add late data regression test for feed and process(Pragya
+   via Samarth Gupta)
    FALCON-701 HadoopUtil and Util classes documented (Paul Isaychuk via
    Raghav Kumar Gautam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
index 922c030..3b2f8dd 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -44,6 +44,10 @@ public final class BundleUtil {
     }
     private static final Logger LOGGER = Logger.getLogger(BundleUtil.class);
 
+    public static Bundle readFeedReplicaltionBundle() throws IOException {
+        return readBundleFromFolder("FeedReplicaltionBundles");
+    }
+
     public static Bundle readLateDataBundle() throws IOException {
         return readBundleFromFolder("LateDataBundles");
     }
@@ -68,10 +72,6 @@ public final class BundleUtil {
         return readBundleFromFolder("hcat_2");
     }
 
-    public static Bundle readLocalDCBundle() throws IOException {
-        return readBundleFromFolder("LocalDC_feedReplicaltion_BillingRC");
-    }
-
     public static Bundle readUpdateBundle() throws IOException {
         return readBundleFromFolder("updateBundle");
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index c42531e..c6217c1 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -244,20 +244,51 @@ public final class OozieUtil {
         return false;
     }
 
+
     public static List<String> getMissingDependencies(ColoHelper helper, String bundleID)
-        throws OozieClientException {
-        BundleJob bundleJob = helper.getClusterHelper().getOozieClient().getBundleJobInfo(bundleID);
-        CoordinatorJob jobInfo =
-            helper.getClusterHelper().getOozieClient().getCoordJobInfo(
-                bundleJob.getCoordinators().get(0).getId());
-        List<CoordinatorAction> actions = jobInfo.getActions();
+            throws OozieClientException {
+        CoordinatorJob jobInfo;
+        jobInfo = null;
+        OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
+        BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID);
+        List<CoordinatorJob> coordinatorJobList = bundleJob.getCoordinators();
+        if (coordinatorJobList.size() > 1) {
+
+            for (CoordinatorJob coord : bundleJob.getCoordinators()) {
+                LOGGER.info("Appname is : " + coord.getAppName());
+                if ((coord.getAppName().contains("DEFAULT") && coord.getAppName().contains("PROCESS"))
+                        ||
+                        (coord.getAppName().contains("REPLICATION") && coord.getAppName().contains("FEED")))
+                    jobInfo = oozieClient.getCoordJobInfo(coord.getId());
+                else {
+                    LOGGER.info("Desired coord does not exists on " + oozieClient.getOozieUrl());
+                }
+            }
 
-        if (actions.size() < 1) {
-            return null;
         }
-        LOGGER.info("conf from event: " + actions.get(0).getMissingDependencies());
+        else {
+            jobInfo = oozieClient.getCoordJobInfo(bundleJob.getCoordinators().get(0).getId());
+        }
 
-        String[] missingDependencies = actions.get(0).getMissingDependencies().split("#");
+        LOGGER.info("Coordinator id : " + jobInfo);
+        List<CoordinatorAction> actions = null;
+        if (jobInfo != null) {
+            actions = jobInfo.getActions();
+        }
+
+        if (actions != null) {
+            if (actions.size() < 1) {
+                return null;
+            }
+        }
+        if (actions != null) {
+            LOGGER.info("conf from event: " + actions.get(0).getMissingDependencies());
+        }
+
+        String[] missingDependencies = new String[0];
+        if (actions != null) {
+            missingDependencies = actions.get(0).getMissingDependencies().split("#");
+        }
         return new ArrayList<String>(Arrays.asList(missingDependencies));
     }
 
@@ -474,4 +505,12 @@ public final class OozieUtil {
             }
         }
     }
+
+    public static void validateRetryAttempts(ColoHelper helper, String bundleId,EntityType type, int attempts) throws OozieClientException {
+        OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
+        CoordinatorJob coord = getDefaultOozieCoord(helper, bundleId,type);
+        int actualRun = oozieClient.getJobInfo(coord.getActions().get(0).getExternalId()).getRun();
+        LOGGER.info("Actual run count: " + actualRun); // wrt 0
+        Assert.assertEquals(actualRun, attempts, "Rerun attempts did not match");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
new file mode 100644
index 0000000..925ae4d
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.*;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * feed late data test
+ */
+
+/*
+ * This test submits and schedules feed and then check for replication.
+ * On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time.
+ * Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement.
+ */
+
+public class FeedLateRerunTest extends BaseTestClass {
+
+    private ColoHelper cluster1 = servers.get(0);
+    private ColoHelper cluster2 = servers.get(1);
+    private FileSystem cluster1FS = serverFS.get(0);
+    private FileSystem cluster2FS = serverFS.get(1);
+    private OozieClient cluster2OC = serverOC.get(1);
+    private String dateTemplate = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String baseTestDir = baseHDFSDir + "/FeedLateRerunTest";
+    private String feedDataLocation = baseTestDir + "/source" + dateTemplate;
+    private String targetPath = baseTestDir + "/target";
+    private String targetDataLocation = targetPath + dateTemplate;
+    private static final Logger LOGGER = Logger.getLogger(FeedLateRerunTest.class);
+    private String source = null;
+    private String target = null;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws JAXBException, IOException {
+        LOGGER.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readFeedReplicaltionBundle();
+
+        bundles[0] = new Bundle(bundle, cluster1);
+        bundles[1] = new Bundle(bundle, cluster2);
+
+        bundles[0].generateUniqueBundle();
+        bundles[1].generateUniqueBundle();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(enabled = true)
+    public void FeedLateRerunTestWithEmptyFolders ()
+            throws AuthenticationException, IOException, URISyntaxException, JAXBException,
+            OozieClientException {
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 30);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+
+        //configure feed
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        //erase all clusters from feed definition
+        feed = InstanceUtil.setFeedCluster(feed,
+                XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+                XmlUtil.createRetention("days(1000000)", ActionType.DELETE), null,
+                ClusterType.SOURCE, null);
+        //set cluster1 as source
+        feed = InstanceUtil.setFeedCluster(feed,
+                XmlUtil.createValidity(startTime, endTime),
+                XmlUtil.createRetention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)),
+                ClusterType.SOURCE, null);
+        //set cluster2 as target
+        feed = InstanceUtil.setFeedCluster(feed,
+                XmlUtil.createValidity(startTime, endTime),
+                XmlUtil.createRetention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)),
+                ClusterType.TARGET, null, targetDataLocation);
+
+        String entityName = Util.readEntityName(feed);
+
+        //submit and schedule feed
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+
+        //check if coordinator exists
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName,
+                        "REPLICATION"), 1);
+
+
+        //Finding bundleId of replicated instance on target
+        String bundleId = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
+
+
+        //Finding and creating missing dependencies
+        List<String> missingDependencies = getAndCreateDependencies(cluster1,cluster1FS,cluster2,cluster2OC, bundleId, false, entityName);
+
+        int count = 1;
+        for (String location : missingDependencies) {
+            if(count==1) {
+                source = location;
+                count++;
+            }
+        }
+
+        source=splitPathFromIp(source,"8020");
+        LOGGER.info("source : " + source);
+        target = source.replace("source", "target");
+        LOGGER.info("target : " + target);
+
+
+        /*
+        Sleep for some time ( as is defined in runtime property of server ).
+        Let the instance rerun and then it should succeed.
+         */
+
+        int sleepMins = 8;
+        for(int i=0; i < sleepMins ; i++) {
+            LOGGER.info("Waiting...");
+            TimeUtil.sleepSeconds(60);
+        }
+
+
+        String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
+        OozieUtil.validateRetryAttempts(cluster2, bundleID,EntityType.FEED, 1);
+
+        //check if data has been replicated correctly
+        List<Path> cluster1ReplicatedData = HadoopUtil
+                .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source));
+        List<Path> cluster2ReplicatedData = HadoopUtil
+                .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target));
+
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+
+
+    }
+
+
+    @Test(enabled = true)
+    public void FeedLateRerunTestWithData ()
+            throws AuthenticationException, IOException, URISyntaxException, JAXBException,
+            OozieClientException {
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 30);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+
+        //configure feed
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        //erase all clusters from feed definition
+        feed = InstanceUtil.setFeedCluster(feed,
+                XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+                XmlUtil.createRetention("days(1000000)", ActionType.DELETE), null,
+                ClusterType.SOURCE, null);
+        //set cluster1 as source
+        feed = InstanceUtil.setFeedCluster(feed,
+                XmlUtil.createValidity(startTime, endTime),
+                XmlUtil.createRetention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)),
+                ClusterType.SOURCE, null);
+        //set cluster2 as target
+        feed = InstanceUtil.setFeedCluster(feed,
+                XmlUtil.createValidity(startTime, endTime),
+                XmlUtil.createRetention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)),
+                ClusterType.TARGET, null, targetDataLocation);
+
+        String entityName = Util.readEntityName(feed);
+
+        //submit and schedule feed
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+
+        //check if coordinator exists
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName,
+                        "REPLICATION"), 1);
+
+
+        //Finding bundleId of replicated instance on target
+        String bundleId = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
+
+        //Finding and creating missing dependencies
+        List<String> missingDependencies = getAndCreateDependencies(cluster1,cluster1FS,cluster2,cluster2OC, bundleId, true, entityName);
+
+        int count = 1;
+        for (String location : missingDependencies) {
+            if(count==1) {
+                source = location;
+                count++;
+            }
+        }
+
+        LOGGER.info("source : " + source);
+        source=splitPathFromIp(source,"8020");
+        LOGGER.info("source : " + source);
+        target = source.replace("source", "target");
+        LOGGER.info("target : " + target);
+
+
+        /*
+        Sleep for some time ( as is defined in runtime property of server ).
+        Let the instance rerun and then it should succeed.
+         */
+
+        int sleepMins = 8;
+        for(int i=0; i < sleepMins ; i++) {
+            LOGGER.info("Waiting...");
+            TimeUtil.sleepSeconds(60);
+        }
+
+
+        String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
+        OozieUtil.validateRetryAttempts(cluster2, bundleID,EntityType.FEED, 1);
+
+        //check if data has been replicated correctly
+        List<Path> cluster1ReplicatedData = HadoopUtil
+                .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source));
+        List<Path> cluster2ReplicatedData = HadoopUtil
+                .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target));
+
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+
+    }
+
+    private String splitPathFromIp (String src,String port) {
+        String req_src,tempSrc="";
+        if(src.contains(":")) {
+            String tempPath[] = src.split(":");
+            for (String aTempPath : tempPath) {
+                if (aTempPath.startsWith(port)) {
+                    tempSrc = aTempPath;
+                }
+            }
+        }
+
+        if(tempSrc.isEmpty()) {
+            req_src=src;
+        }
+        else {
+            req_src=tempSrc.replace(port,"");
+        }
+        return req_src;
+    }
+
+    /*
+    prismHelper1 - source colo
+    prismHelper2 - target colo
+     */
+    private List<String> getAndCreateDependencies (ColoHelper prismHelper1, FileSystem clusterFS1, ColoHelper prismHelper2,OozieClient oozieClient2, String bundleId,  boolean dataFlag, String entityName)
+            throws OozieClientException, IOException {
+
+        List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
+        for (int i = 0; i < 10 && missingDependencies == null; ++i) {
+            TimeUtil.sleepSeconds(30);
+            LOGGER.info("sleeping...");
+            missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
+        }
+        Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
+
+        //print missing dependencies
+        for (String dependency : missingDependencies) {
+            LOGGER.info("dependency from job: " + dependency);
+        }
+
+        // Creating missing dependencies
+        InstanceUtil.createHDFSFolders(prismHelper1, missingDependencies);
+
+        //Adding data to empty folders depending on dataFlag
+        if(dataFlag) {
+            int tempCount = 1;
+            for (String location : missingDependencies) {
+                if(tempCount==1) {
+                    LOGGER.info("Transferring data to : " + location);
+                    HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
+                    tempCount++;
+                }
+            }
+        }
+
+        //replication should start, wait while it ends
+        InstanceUtil.waitTillInstanceReachState(oozieClient2, entityName, 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        // Adding data for late rerun
+
+        int tempCounter = 1;
+        for (String dependency : missingDependencies) {
+            if(tempCounter==1) {
+                LOGGER.info("Transferring late data to : " + dependency);
+                HadoopUtil.copyDataToFolder(clusterFS1, dependency, OSUtil.RESOURCES + "log4j.properties");
+            }
+            tempCounter++;
+        }
+
+        return missingDependencies;
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
index 8399814..3526e46 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -81,7 +81,7 @@ public class FeedReplicationTest extends BaseTestClass {
     @BeforeMethod(alwaysRun = true)
     public void setUp(Method method) throws JAXBException, IOException {
         LOGGER.info("test name: " + method.getName());
-        Bundle bundle = BundleUtil.readLocalDCBundle();
+        Bundle bundle = BundleUtil.readFeedReplicaltionBundle();
 
         bundles[0] = new Bundle(bundle, cluster1);
         bundles[1] = new Bundle(bundle, cluster2);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
new file mode 100644
index 0000000..3f7258e
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.*;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.*;
+import org.testng.Assert;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.*;
+
+/**
+ * Process late data test
+ */
+
+public class ProcessLateRerunTest extends BaseTestClass {
+
+
+    ColoHelper cluster1 = servers.get(0);
+    OozieClient cluster1OC = serverOC.get(0);
+    FileSystem cluster1FS = serverFS.get(0);
+    String aggregateWorkflowDir = baseHDFSDir + "/ProcessLateRerunTest/aggregator";
+    private static final Logger logger = Logger.getLogger(ProcessLateRerunTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLateDataBundle();
+        for (int i = 0; i < 1; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Test demonstrates rerunning process for late arrival of data.
+     * Initially there is no input data and empty folders are processed.
+     * It checks the number of rerun attempts once late data has been added
+     * ensuring that late rerun happened.
+     */
+
+    @Test(enabled = true)
+    public void testProcessLateRerunOnEmptyFolder() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 30);
+        logger.info("Time range between : " + startTime + " and " + endTime);
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(2);
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+        bundles[0].setProcessLatePolicy(getLateData(2, "minutes", "periodic", inputName, aggregateWorkflowDir));
+
+        bundles[0].submitAndScheduleProcess();
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        TimeUtil.sleepSeconds(10);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+
+        getAndCreateDependencies(cluster1,bundles[0],cluster1OC,cluster1FS,false,1);
+
+        int sleepMins = 6;
+        for(int i=0; i < sleepMins ; i++) {
+            logger.info("Waiting...");
+            TimeUtil.sleepSeconds(60);
+        }
+
+        InstanceUtil.waitTillInstanceReachState(cluster1OC,
+                Util.getProcessName(bundles[0].getProcessData()), 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        List<String> bundleList =  OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
+                Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+        String bundleID = bundleList.get(0);
+
+        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+
+    }
+
+    /**
+     * Test demonstrates rerunning process for late arrival of data.
+     * Initially there is some data which is processed. It checks the number of rerun attempts
+     * once further more data has been added ensuring that late rerun happened.
+     */
+    @Test(enabled = true)
+    public void testProcessLateRerunWithData() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 30);
+        logger.info("Time range between : " + startTime + " and " + endTime);
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(2);
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+
+        bundles[0].setProcessLatePolicy(getLateData(4,"minutes","periodic",inputName,aggregateWorkflowDir));
+        bundles[0].submitAndScheduleProcess();
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        TimeUtil.sleepSeconds(10);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+
+        getAndCreateDependencies(cluster1,bundles[0],cluster1OC,cluster1FS,true,1);
+
+        int sleepMins = 6;
+        for(int i=0; i < sleepMins ; i++) {
+            logger.info("Waiting...");
+            TimeUtil.sleepSeconds(60);
+        }
+
+        InstanceUtil.waitTillInstanceReachState(cluster1OC,
+                Util.getProcessName(bundles[0].getProcessData()), 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        List<String> bundleList =  OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
+                Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+        String bundleID = bundleList.get(0);
+
+        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+
+    }
+
+    /**
+     * Test demonstrates rerunning process for late arrival of data for multiple input folders.
+     * It checks the number of rerun attempts once further more data has been added ensuring that late rerun happened.
+     */
+    @Test(enabled = true)
+    public void testProcessLateRerunWithMultipleFolders() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 30);
+        String startInstance = "now(0,-5)";
+        String endInstance = "now(0,0)";
+        logger.info("Time range between : " + startTime + " and " + endTime);
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+
+        bundles[0].setProcessLatePolicy(getLateData(4,"minutes","periodic",inputName,aggregateWorkflowDir));
+        bundles[0].setProcessConcurrency(2);
+
+        // Increase the window of input for process
+        bundles[0].setDatasetInstances(startInstance, endInstance);
+        bundles[0].submitAndScheduleProcess();
+
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        TimeUtil.sleepSeconds(10);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+
+        getAndCreateDependencies(cluster1,bundles[0],cluster1OC,cluster1FS,false,3);
+
+        int sleepMins = 6;
+        for(int i=0; i < sleepMins ; i++) {
+            logger.info("Waiting...");
+            TimeUtil.sleepSeconds(60);
+        }
+
+        InstanceUtil.waitTillInstanceReachState(cluster1OC,
+                Util.getProcessName(bundles[0].getProcessData()), 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        List<String> bundleList =  OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
+                Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+        String bundleID = bundleList.get(0);
+
+        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+
+    }
+
+    /**
+     * Test demonstrates rerunning process for late arrival of data for gate folders.
+     * Late rerun will not work on gate folder, so no retry attempt on the appended data.
+     */
+    @Test(enabled = true)
+    public void testProcessLateRerunWithGate() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 30);
+        String startInstance = "now(0,-5)";
+        String endInstance = "now(0,0)";
+        logger.info("Time range between : " + startTime + " and " + endTime);
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(2);
+
+        // Increase the window of input for process
+        bundles[0].setDatasetInstances(startInstance, endInstance);
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+        Input tempFeed = processMerlin.getInputs().getInputs().get(0);
+
+        Input gateInput = new Input();
+        gateInput.setName("Gate");
+        gateInput.setFeed(tempFeed.getFeed());
+        gateInput.setEnd("now(0,1)");
+        gateInput.setStart("now(0,1)");
+        processMerlin.getInputs().getInputs().add(gateInput);
+        bundles[0].setProcessData(processMerlin.toString());
+
+        bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
+
+        bundles[0].submitAndScheduleProcess();
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+        TimeUtil.sleepSeconds(10);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+
+        getAndCreateDependencies(cluster1,bundles[0],cluster1OC,cluster1FS,false,7);
+
+        int sleepMins = 6;
+        for(int i=0; i < sleepMins ; i++) {
+            logger.info("Waiting...");
+            TimeUtil.sleepSeconds(60);
+        }
+
+        InstanceUtil.waitTillInstanceReachState(cluster1OC,
+                Util.getProcessName(bundles[0].getProcessData()), 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        List<String> bundleList =  OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
+                Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+        String bundleID = bundleList.get(0);
+
+        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 0);
+
+    }
+
+    /*
+    dataFlag - denotes whether process should run initially on empty folders or folders containing data
+    dataFolder - denotes the folder where you want to upload data for late rerun
+     */
+
+    private void getAndCreateDependencies(ColoHelper prismHelper, Bundle bundle, OozieClient oozieClient, FileSystem clusterFS, boolean dataFlag, int dataFolder) {
+
+        try {
+            List<String> bundles = null;
+            for (int i = 0; i < 10; ++i) {
+                bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
+                        Util.getProcessName(bundle.getProcessData()), EntityType.PROCESS);
+                if (bundles.size() > 0) {
+                    break;
+                }
+                TimeUtil.sleepSeconds(30);
+            }
+            Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
+            String bundleID = bundles.get(0);
+            logger.info("bundle id: " + bundleID);
+            List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID);
+            for (int i = 0; i < 10 && missingDependencies == null; ++i) {
+                TimeUtil.sleepSeconds(30);
+                missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID);
+            }
+            Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
+
+            //print missing dependencies
+            for (String dependency : missingDependencies) {
+                logger.info("dependency from job: " + dependency);
+            }
+
+            //create missing dependencies
+            logger.info("Creating missing dependencies...");
+            OozieUtil.createMissingDependencies(prismHelper, EntityType.PROCESS, Util.getProcessName(bundle.getProcessData()), 0, 0);
+
+            //Adding data to empty folders depending on dataFlag
+            if(dataFlag) {
+                int tempCount = 1;
+                for (String location : missingDependencies) {
+                    if(tempCount==1) {
+                        logger.info("Transferring data to : " + location);
+                        HadoopUtil.copyDataToFolder(clusterFS, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
+                        tempCount++;
+                    }
+                }
+            }
+
+            //Process succeeding on empty folders
+            logger.info("Waiting for process to succeed...");
+            InstanceUtil.waitTillInstanceReachState(oozieClient,
+                    Util.getProcessName(bundle.getProcessData()), 1,
+                    CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+            TimeUtil.sleepSeconds(30);
+
+            //Adding data to check late rerun
+            int tempCounter = 1;
+            for (String dependency : missingDependencies) {
+                if(tempCounter==dataFolder) {
+                    logger.info("Transferring late data to : " + dependency);
+                    HadoopUtil.copyDataToFolder(clusterFS, dependency, OSUtil.RESOURCES + "log4j.properties");
+                }
+                tempCounter++;
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e);
+        }
+    }
+
+    private static LateProcess getLateData(int delay, String delayUnits, String retryType, String inputData, String workflowDir) {
+        LateInput lateInput = new LateInput();
+        lateInput.setInput(inputData);
+        lateInput.setWorkflowPath(workflowDir);
+        LateProcess lateProcess = new LateProcess();
+        lateProcess.setDelay(new Frequency(delayUnits + "(" + delay + ")"));
+        lateProcess.setPolicy(PolicyType.fromValue(retryType));
+        lateProcess.getLateInputs().add(lateInput);
+        return lateProcess;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
index e940b19..ed2d9d7 100755
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
@@ -173,7 +173,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
     @BeforeMethod(alwaysRun = true)
     public void testName(Method method) throws Exception {
         logger.info("test name: " + method.getName());
-        Bundle bundle = BundleUtil.readLocalDCBundle();
+        Bundle bundle = BundleUtil.readFeedReplicaltionBundle();
 
         for (int i = 0; i < 3; i++) {
             bundles[i] = new Bundle(bundle, servers.get(i));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index e17d9f0..505df1c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -79,7 +79,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
     @BeforeMethod(alwaysRun = true)
     public void setup(Method method) throws IOException {
         logger.info("test name: " + method.getName());
-        Bundle bundle = BundleUtil.readLocalDCBundle();
+        Bundle bundle = BundleUtil.readFeedReplicaltionBundle();
         bundles[0] = new Bundle(bundle, cluster1);
         bundles[1] = new Bundle(bundle, cluster2);
         bundles[2] = new Bundle(bundle, cluster3);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/InputFeed.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/InputFeed.xml b/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/InputFeed.xml
new file mode 100755
index 0000000..4ddda84
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/InputFeed.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<feed name="InputFeed" description="Input File" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+    <frequency>minutes(5)</frequency>
+    <late-arrival cut-off="days(100000)"/>
+    <clusters>
+        <cluster name="lhr1-emerald" type="target" partition="${cluster.colo}">
+            <validity start="2012-07-20T00:00Z" end="2099-07-16T00:00Z"/>
+            <retention limit="days(10000)" action="delete"/>
+        </cluster>
+    </clusters>
+    <locations>
+        <location type="data" path="/data/regression/fetlrc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
+        <location type="stats" path="/data/regression/fetlrc/billing/stats"/>
+        <location type="meta" path="/data/regression/fetlrc/billing/metadata"/>
+    </locations>
+    <ACL owner="fetl" group="group" permission="0x755"/>
+    <schema location="/databus/streams_local/click_rr/schema/" provider="protobuf"/>
+
+    <properties>
+    <property name="field1" value="value1" />
+    <property name="field2" value="value2" />
+  </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/cluster-0.1.xml b/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/cluster-0.1.xml
new file mode 100755
index 0000000..7d1b089
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/FeedReplicaltionBundles/cluster-0.1.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<cluster name="ivory2-stg4" description="" colo="stg" xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://stg-nn.blue.ua2.inmobi.com:50070" version="0.20.2-cdh3u0"/>
+        <interface type="execute" endpoint="stg-jt.blue.ua2.inmobi.com:54311" version="0.20.2-cdh3u0"/>
+        <interface type="write" endpoint="hdfs://stg-nn.blue.ua2.inmobi.com:54310" version="0.20.2-cdh3u0"/>
+        <interface type="messaging" endpoint="tcp://gs1134.blue.ua2.inmobi.com:61618?daemon=true" version="5.1.6"/>
+        <interface type="workflow" endpoint="http://gs1134.blue.ua2.inmobi.com:11002/oozie/" version="3.1.4"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/ivory2/stg/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/ivory2/stg/working"/>
+    </locations>
+    <properties>
+        <property name="colo.name" value="ua1"/>
+        <property name="hbase.zookeeper.quorum" value="192.168.138.115"/>
+        <property name="hbase.zookeeper.property.clientPort" value="2181"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/FETL-BillingRC.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/FETL-BillingRC.xml b/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/FETL-BillingRC.xml
deleted file mode 100755
index eb31839..0000000
--- a/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/FETL-BillingRC.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<feed name="FETL-BillingRC" description="billing RC File" xmlns="uri:falcon:feed:0.1">
-    <partitions>
-        <partition name="colo"/>
-        <partition name="eventTime"/>
-        <partition name="impressionHour"/>
-        <partition name="pricingModel"/>
-    </partitions>
-    <frequency>minutes(5)</frequency>
-    <late-arrival cut-off="days(100000)"/>
-    <clusters>
-         <cluster name="clusterName" type="source">
-            <validity start="2012-07-20T03:00Z" end="2099-07-16T00:00Z"/>
-            <retention limit="days(10000)" action="delete"/>
-        </cluster>
-    </clusters>
-    <locations>
-        <location type="data" path="/data/regression/fetlrc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
-        <location type="stats" path="/data/regression/fetlrc/billing/stats"/>
-        <location type="meta" path="/data/regression/fetlrc/billing/metadata"/>
-    </locations>
-    <ACL owner="default" group="group" permission="0x755"/>
-    <schema location="/schemaLocaltion/" provider="protobuf"/>
-
-    <properties>
-    <property name="field1" value="value1" />
-    <property name="field2" value="value2" />
-  </properties>
-</feed>
-

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c8ecb10c/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/cluster-0.1.xml b/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/cluster-0.1.xml
deleted file mode 100755
index 1b16836..0000000
--- a/falcon-regression/merlin/src/test/resources/LocalDC_feedReplicaltion_BillingRC/cluster-0.1.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<cluster name="ivory2-stg4" description="" colo="stg" xmlns="uri:falcon:cluster:0.1">
-    <interfaces>
-        <interface type="readonly" endpoint="hftp://stg-nn.blue.ua2.inmobi.com:50070" version="2.5.0"/>
-        <interface type="execute" endpoint="stg-jt.blue.ua2.inmobi.com:54311" version="2.5.0"/>
-        <interface type="write" endpoint="hdfs://stg-nn.blue.ua2.inmobi.com:54310" version="2.5.0"/>
-        <interface type="messaging" endpoint="tcp://gs1134.blue.ua2.inmobi.com:61618?daemon=true" version="5.1.6"/>
-        <interface type="workflow" endpoint="http://gs1134.blue.ua2.inmobi.com:11002/oozie/" version="3.1.4"/>
-    </interfaces>
-    <locations>
-        <location name="staging" path="/projects/ivory2/stg/staging"/>
-        <location name="temp" path="/tmp"/>
-        <location name="working" path="/projects/ivory2/stg/working"/>
-    </locations>
-    <properties>
-        <property name="colo.name" value="ua2"/>
-        <property name="hbase.zookeeper.quorum" value="gs1001.grid.corp.inmobi.com"/>
-        <property name="hbase.zookeeper.property.clientPort" value="2181"/>
-    </properties>
-</cluster>


Mime
View raw message