falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [09/27] adding falcon-regression
Date Mon, 04 Aug 2014 10:04:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
new file mode 100755
index 0000000..83759b1
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
@@ -0,0 +1,898 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    ColoHelper cluster3 = servers.get(2);
+    FileSystem cluster1FS = serverFS.get(0);
+    FileSystem cluster2FS = serverFS.get(1);
+    FileSystem cluster3FS = serverFS.get(2);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    private String testDate = "/2012/10/01/12/";
+    private String baseTestDir = baseHDFSDir + "/PrismFeedReplicationPartitionExpTest";
+    private String testBaseDir1 = baseTestDir + "/localDC/rc/billing";
+    private String testBaseDir2 = baseTestDir + "/clusterPath/localDC/rc/billing";
+    private String testBaseDir3 = baseTestDir + "/dataBillingRC/fetlrc/billing";
+    private String testBaseDir4 = baseTestDir + "/sourcetarget";
+    private String testBaseDir_server1source = baseTestDir + "/source1";
+    private String testDirWithDate = testBaseDir1 + testDate;
+    private String testDirWithDate_sourcetarget = testBaseDir4 + testDate;
+    private String testDirWithDate_source1 = testBaseDir_server1source + testDate;
+    private String dateTemplate = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String testFile1 = OSUtil.RESOURCES
+        + OSUtil.getPath("ReplicationResources", "feed-s4Replication.xml");
+    private String testFile2 = OSUtil.RESOURCES + OSUtil.getPath("ReplicationResources", "id.pig");
+    private String testFile3 = OSUtil.RESOURCES
+        + OSUtil.getPath("ReplicationResources", "cluster-0.1.xml");
+    private String testFile4 = OSUtil.RESOURCES
+        + OSUtil.getPath("ReplicationResources", "log4testng.properties");
+    private static final Logger logger =
+        Logger.getLogger(PrismFeedReplicationPartitionExpTest.class);
+
+
+// pt : partition in target
+// ps: partition in source
+
+
+    private void uploadDataToServer3(String location, String fileName) throws IOException {
+        HadoopUtil.recreateDir(cluster3FS, location);
+        HadoopUtil.copyDataToFolder(cluster3FS, location, fileName);
+    }
+
+    private void uploadDataToServer1(String location, String fileName) throws IOException {
+        HadoopUtil.recreateDir(cluster1FS, location);
+        HadoopUtil.copyDataToFolder(cluster1FS, location, fileName);
+    }
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+
+        logger.info("creating test data");
+
+        uploadDataToServer3(testDirWithDate + "00/ua2/", testFile1);
+        uploadDataToServer3(testDirWithDate + "05/ua2/", testFile2);
+        uploadDataToServer3(testDirWithDate + "10/ua2/", testFile3);
+        uploadDataToServer3(testDirWithDate + "15/ua2/", testFile4);
+        uploadDataToServer3(testDirWithDate + "20/ua2/", testFile4);
+
+        uploadDataToServer3(testDirWithDate + "00/ua1/", testFile1);
+        uploadDataToServer3(testDirWithDate + "05/ua1/", testFile2);
+        uploadDataToServer3(testDirWithDate + "10/ua1/", testFile3);
+        uploadDataToServer3(testDirWithDate + "15/ua1/", testFile4);
+        uploadDataToServer3(testDirWithDate + "20/ua1/", testFile4);
+
+        uploadDataToServer3(testDirWithDate + "00/ua3/", testFile1);
+        uploadDataToServer3(testDirWithDate + "05/ua3/", testFile2);
+        uploadDataToServer3(testDirWithDate + "10/ua3/", testFile3);
+        uploadDataToServer3(testDirWithDate + "15/ua3/", testFile4);
+        uploadDataToServer3(testDirWithDate + "20/ua3/", testFile4);
+
+        uploadDataToServer3(testBaseDir3 + testDate + "00/ua2/", testFile1);
+        uploadDataToServer3(testBaseDir3 + testDate + "05/ua2/", testFile2);
+        uploadDataToServer3(testBaseDir3 + testDate + "10/ua2/", testFile3);
+        uploadDataToServer3(testBaseDir3 + testDate + "15/ua2/", testFile4);
+        uploadDataToServer3(testBaseDir3 + testDate + "20/ua2/", testFile4);
+
+
+        uploadDataToServer3(testBaseDir3 + testDate + "00/ua1/", testFile1);
+        uploadDataToServer3(testBaseDir3 + testDate + "05/ua1/", testFile2);
+        uploadDataToServer3(testBaseDir3 + testDate + "10/ua1/", testFile3);
+        uploadDataToServer3(testBaseDir3 + testDate + "15/ua1/", testFile4);
+        uploadDataToServer3(testBaseDir3 + testDate + "20/ua1/", testFile4);
+
+
+        uploadDataToServer3(testBaseDir3 + testDate + "00/ua3/", testFile1);
+        uploadDataToServer3(testBaseDir3 + testDate + "05/ua3/", testFile2);
+        uploadDataToServer3(testBaseDir3 + testDate + "10/ua3/", testFile3);
+        uploadDataToServer3(testBaseDir3 + testDate + "15/ua3/", testFile4);
+        uploadDataToServer3(testBaseDir3 + testDate + "20/ua3/", testFile4);
+
+
+        //data for test normalTest_1s2t_pst where both source target partition are required
+
+        uploadDataToServer3(testDirWithDate_sourcetarget + "00/ua3/ua2/", testFile1);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "05/ua3/ua2/", testFile2);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "10/ua3/ua2/", testFile3);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "15/ua3/ua2/", testFile4);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "20/ua3/ua2/", testFile4);
+
+        uploadDataToServer3(testDirWithDate_sourcetarget + "00/ua3/ua1/", testFile1);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "05/ua3/ua1/", testFile2);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "10/ua3/ua1/", testFile3);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "15/ua3/ua1/", testFile4);
+        uploadDataToServer3(testDirWithDate_sourcetarget + "20/ua3/ua1/", testFile4);
+
+        // data when server 1 acts as source
+        uploadDataToServer1(testDirWithDate_source1 + "00/ua2/", testFile1);
+        uploadDataToServer1(testDirWithDate_source1 + "05/ua2/", testFile2);
+
+
+        uploadDataToServer1(testDirWithDate_source1 + "00/ua1/", testFile1);
+        uploadDataToServer1(testDirWithDate_source1 + "05/ua1/", testFile2);
+
+
+        uploadDataToServer1(testDirWithDate_source1 + "00/ua3/", testFile1);
+        uploadDataToServer1(testDirWithDate_source1 + "05/ua3/", testFile2);
+
+        logger.info("completed creating test data");
+
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void testName(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLocalDCBundle();
+
+        for (int i = 0; i < 3; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        for (String dir : new String[]{testBaseDir1, testBaseDir2, testBaseDir3, testBaseDir4}) {
+            HadoopUtil.deleteDirIfExists(dir, cluster1FS);
+            HadoopUtil.deleteDirIfExists(dir, cluster2FS);
+        }
+        removeBundles();
+    }
+
+
+    @Test(enabled = true, groups = "embedded")
+    public void blankPartition() throws Exception {
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+        //partition is left blank
+
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String startTimeUA1 = "2012-10-01T12:05Z";
+        String startTimeUA2 = "2012-10-01T12:10Z";
+
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2012-10-01T12:10Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, "",
+                testBaseDir1 + dateTemplate);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2012-10-01T12:25Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, "",
+                testBaseDir2 + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, "");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source " +
+            "is blank");
+    }
+
+
+    @Test(enabled = true)
+    public void normalTest_1s1t1n_ps() throws Exception {
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+
+        // there are 1 source clusters cluster3
+        //cluster2 is the target
+        //data should be replicated to cluster2 from cluster3
+
+        // path for data in target cluster should also be customized
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+        String startTimeUA1 = "2012-10-01T12:00Z";
+        String startTimeUA2 = "2012-10-01T12:00Z";
+
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(100000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), null, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(100000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                testBaseDir2 + dateTemplate);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("days(100000)", ActionType.DELETE),
+                Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+                "${cluster.colo}", testBaseDir1 + dateTemplate);
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(r);
+        TimeUtil.sleepSeconds(15);
+
+        HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "00/ua3/");
+        HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "05/ua3/");
+
+        HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "00/ua3/",
+            testFile1);
+        HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/",
+            testFile2);
+
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+        Assert.assertEquals(
+            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+        Assert.assertEquals(
+            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 1);
+        Assert.assertEquals(
+            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 1);
+        Assert.assertEquals(
+            InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 1);
+
+
+        //check if data has been replicated correctly
+
+        //on ua1 only ua1 should be replicated, ua2 only ua2
+        //number of files should be same as source
+
+
+        List<Path> ua2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2));
+        AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua2");
+
+
+        List<Path> ua3ReplicatedData00 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua3/"));
+        List<Path> ua3ReplicatedData05 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "05/ua3/"));
+
+        List<Path> ua2ReplicatedData00 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2 + testDate + "00"));
+        List<Path> ua2ReplicatedData05 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2 + testDate + "05"));
+
+        AssertUtil.checkForListSizes(ua3ReplicatedData00, ua2ReplicatedData00);
+        AssertUtil.checkForListSizes(ua3ReplicatedData05, ua2ReplicatedData05);
+    }
+
+
+    @Test(enabled = true)
+    public void normalTest_1s1t1n_pt() throws Exception {
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+        // path for data in target cluster should also be customized
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String startTimeUA1 = "2012-10-01T12:00Z";
+        String startTimeUA2 = "2012-10-01T12:00Z";
+
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), null, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
+                "${cluster.colo}", testBaseDir2 + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, null,
+            testBaseDir1 + dateTemplate);
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r =
+            prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 1);
+
+
+        //check if data has been replicated correctly
+
+        //on ua1 only ua1 should be replicated, ua2 only ua2
+        //number of files should be same as source
+
+
+        List<Path> ua2ReplicatedData =
+            HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2));
+        AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3");
+
+
+        List<Path> ua3ReplicatedData00 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua2/"));
+        List<Path> ua3ReplicatedData05 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "05/ua2/"));
+
+        List<Path> ua2ReplicatedData00 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2 + testDate + "00"));
+        List<Path> ua2ReplicatedData05 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2 + testDate + "05"));
+
+        AssertUtil.checkForListSizes(ua3ReplicatedData00, ua2ReplicatedData00);
+        AssertUtil.checkForListSizes(ua3ReplicatedData05, ua2ReplicatedData05);
+    }
+
+
+    @Test(enabled = true)
+    public void normalTest_1s2t_pt() throws Exception {
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+
+        //cluster3 is global cluster where test data is present in location
+        // /data/fetlrc/billing/2012/10/01/12/
+        // (00 to 30)
+        //data should be replicated to folder on cluster1 and cluster2 as targets
+        //ua3 is the source and ua1 and ua2 are target
+
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+        String startTimeUA1 = "2012-10-01T12:05Z";
+        String startTimeUA2 = "2012-10-01T12:10Z";
+
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed, testBaseDir3 + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2012-10-01T12:10Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+                "${cluster.colo}");
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2012-10-01T12:25Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
+                "${cluster.colo}");
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, null);
+
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+        TimeUtil.sleepSeconds(15);
+
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 3,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+
+        //check if data has been replicated correctly
+
+        //on ua1 only ua1 should be replicated, ua2 only ua2
+        //number of files should be same as source
+
+
+        List<Path> ua1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate));
+        //check for no ua2 or ua3 in ua1
+        AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2", "ua3");
+
+        List<Path> ua2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS,
+                new Path(testBaseDir3 + testDate));
+        AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3");
+
+
+        List<Path> ua1ReplicatedData00 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate + "00/"));
+        List<Path> ua1ReplicatedData10 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate + "10/"));
+
+        List<Path> ua2ReplicatedData10 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate + "10"));
+        List<Path> ua2ReplicatedData15 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate + "15"));
+
+        List<Path> ua3OriginalData10ua1 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate + "10/ua1"));
+        List<Path> ua3OriginalData10ua2 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate + "10/ua2"));
+        List<Path> ua3OriginalData15ua2 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate + "15/ua2"));
+
+        AssertUtil.checkForListSizes(ua1ReplicatedData00, new ArrayList<Path>());
+        AssertUtil.checkForListSizes(ua1ReplicatedData10, ua3OriginalData10ua1);
+        AssertUtil.checkForListSizes(ua2ReplicatedData10, ua3OriginalData10ua2);
+        AssertUtil.checkForListSizes(ua2ReplicatedData15, ua3OriginalData15ua2);
+    }
+
+    @Test(enabled = true, groups = "embedded")
+    public void normalTest_2s1t_pt() throws Exception {
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+
+        // there are 2 source clusters cluster3 and cluster1
+        //cluster2 is the target
+        // Since there is no partition expression in source clusters, the feed submission should
+        // fail (FALCON-305).
+
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String startTimeUA1 = "2012-10-01T12:05Z";
+        String startTimeUA2 = "2012-10-01T12:10Z";
+
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2012-10-01T12:10Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null,
+                testBaseDir1 + dateTemplate);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2012-10-01T12:25Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
+                "${cluster.colo}",
+                testBaseDir2 + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, null);
+
+        //clean target if old data exists
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        AssertUtil.assertFailed(r, "Submission of feed should have failed.");
+        Assert.assertTrue(r.getMessage().contains(
+                "Partition expression has to be specified for cluster " +
+                    Util.readEntityName(bundles[0].getClusters().get(0)) +
+                    " as there are more than one source clusters"),
+            "Failed response has unexpected error message.");
+    }
+
+
+    @Test(enabled = true)
+    public void normalTest_1s2t_ps() throws Exception {
+
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+
+        //cluster3 is global cluster where test data is present in location
+        // /data/fetlrc/billing/2012/10/01/12/
+        // (00 to 30)
+        //data should be replicated to folder on cluster1 and cluster2 as targets
+        //ua3 is the source and ua1 and ua2 are target
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String startTimeUA1 = "2012-10-01T12:05Z";
+        String startTimeUA2 = "2012-10-01T12:10Z";
+
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed,
+            testBaseDir1 + dateTemplate);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2012-10-01T12:11Z"),
+                XmlUtil.createRtention("days(10000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null,
+                testBaseDir1 + "/ua1" + dateTemplate);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2012-10-01T12:26Z"),
+                XmlUtil.createRtention("days(10000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                testBaseDir1 + "/ua2" + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+        TimeUtil.sleepSeconds(15);
+
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+
+        //check if data has been replicated correctly
+
+        //on ua1 only ua1 should be replicated, ua2 only ua2
+        //number of files should be same as source
+
+
+        List<Path> ua1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate));
+        //check for no ua2 or ua3 in ua1
+        AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2");
+
+        List<Path> ua2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate));
+        AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1");
+
+
+        List<Path> ua1ReplicatedData05 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS,
+                new Path(testBaseDir1 + "/ua1" + testDate + "05/"));
+        List<Path> ua1ReplicatedData10 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS,
+                new Path(testBaseDir1 + "/ua1" + testDate + "10/"));
+
+        List<Path> ua2ReplicatedData10 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate + "10"));
+        List<Path> ua2ReplicatedData15 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate + "15"));
+
+        List<Path> ua3OriginalData10ua1 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "10/ua1"));
+        List<Path> ua3OriginalData05ua1 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "05/ua1"));
+        List<Path> ua3OriginalData10ua2 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "10/ua2"));
+        List<Path> ua3OriginalData15ua2 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "15/ua2"));
+
+        AssertUtil.checkForListSizes(ua1ReplicatedData10, ua3OriginalData10ua1);
+        AssertUtil.checkForListSizes(ua1ReplicatedData05, ua3OriginalData05ua1);
+        AssertUtil.checkForListSizes(ua2ReplicatedData10, ua3OriginalData10ua2);
+        AssertUtil.checkForListSizes(ua2ReplicatedData15, ua3OriginalData15ua2);
+
+    }
+
+
+    @Test(enabled = true)
+    public void normalTest_2s1t_ps() throws Exception {
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+
+        // there are 2 source clusters cluster3 and cluster1
+        //cluster2 is the target
+        //data should be replicated to cluster2 from ua2 sub dir of cluster3 and cluster1
+        // source cluster path in cluster1 should be mentioned in cluster definition
+        // path for data in target cluster should also be customized
+
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String startTimeUA1 = "2012-10-01T12:00Z";
+        String startTimeUA2 = "2012-10-01T12:00Z";
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+                "${cluster.colo}",
+                testBaseDir_server1source + dateTemplate);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                testBaseDir2 + "/replicated" + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}", testBaseDir1 + dateTemplate);
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(r);
+        TimeUtil.sleepSeconds(15);
+
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED,20);
+
+        //check if data has been replicated correctly
+
+        //on ua1 only ua1 should be replicated, ua2 only ua2
+        //number of files should be same as source
+
+
+        List<Path> ua2ReplicatedData = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS,
+            new Path(testBaseDir2 + "/replicated" + testDate));
+        AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua2");
+
+        List<Path> ua2ReplicatedData00ua1 = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS,
+            new Path(testBaseDir2 + "/replicated" + testDate + "00/ua1"));
+        List<Path> ua2ReplicatedData05ua3 = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS,
+            new Path(testBaseDir2 + "/replicated" + testDate + "05/ua3/"));
+
+
+        List<Path> ua1OriginalData00 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir_server1source + testDate + "00/ua1"));
+        List<Path> ua3OriginalData05 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "05/ua1"));
+
+        AssertUtil.checkForListSizes(ua2ReplicatedData00ua1, ua1OriginalData00);
+        AssertUtil.checkForListSizes(ua2ReplicatedData05ua3, ua3OriginalData05);
+    }
+
+
+    @Test(enabled = true)
+    public void normalTest_1s2t_pst() throws Exception {
+
+
+        //this test is for ideal condition when data is present in all the required places and
+        // replication takes
+        // place normally
+
+        //cluster3 is global cluster where test data is present in location
+        // /data/fetlrc/billing/2012/10/01/12/
+        // (00 to 30)
+        //data should be replicated to folder on cluster1 and cluster2 as targets
+        //ua3 is the source and ua1 and ua2 are target
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String startTimeUA1 = "2012-10-01T12:05Z";
+        String startTimeUA2 = "2012-10-01T12:10Z";
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed, testBaseDir1 + dateTemplate);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+                "${cluster.colo}", testBaseDir1 + "/ua1" + dateTemplate + "/");
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
+                "${cluster.colo}", testBaseDir1 + "/ua2" + dateTemplate + "/");
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
+            , XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}", testBaseDir4 + dateTemplate + "/");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+        TimeUtil.sleepSeconds(15);
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
+
+        //check if data has been replicated correctly
+
+        //on ua1 only ua1 should be replicated, ua2 only ua2
+        //number of files should be same as source
+
+
+        List<Path> ua1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate));
+        //check for no ua2  in ua1
+        AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2");
+
+        List<Path> ua2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate));
+        AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1");
+
+
+        List<Path> ua1ReplicatedData05 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS,
+                new Path(testBaseDir1 + "/ua1" + testDate + "05/"));
+        List<Path> ua1ReplicatedData10 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS,
+                new Path(testBaseDir1 + "/ua1" + testDate + "10/"));
+
+        List<Path> ua2ReplicatedData10 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate + "10"));
+        List<Path> ua2ReplicatedData15 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate + "15"));
+
+
+        List<Path> ua3OriginalData05ua1 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "05/ua3/ua1"));
+        List<Path> ua3OriginalData10ua1 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "10/ua3/ua1"));
+        List<Path> ua3OriginalData10ua2 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "10/ua3/ua2"));
+        List<Path> ua3OriginalData15ua2 = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "15/ua3/ua2"));
+
+        AssertUtil.checkForListSizes(ua1ReplicatedData05, ua3OriginalData05ua1);
+        AssertUtil.checkForListSizes(ua1ReplicatedData10, ua3OriginalData10ua1);
+        AssertUtil.checkForListSizes(ua2ReplicatedData10, ua3OriginalData10ua2);
+        AssertUtil.checkForListSizes(ua2ReplicatedData15, ua3OriginalData15ua2);
+    }
+
+
+    @Test(enabled = true, groups = "embedded")
+    public void moreThanOneClusterWithSameNameDiffValidity() throws Exception {
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String startTimeUA1 = "2012-10-01T12:05Z";
+        String startTimeUA2 = "2012-10-01T12:10Z";
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2012-10-01T12:10Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, "",
+                testBaseDir1 + dateTemplate);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2012-10-01T12:25Z"),
+                XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+                Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.TARGET, "",
+                testBaseDir2 + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, "");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        ServiceResponse r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertFailed(r, "is defined more than once for feed");
+        Assert.assertTrue(r.getMessage().contains("is defined more than once for feed"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
new file mode 100644
index 0000000..af4243b
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+@Test(groups = "embedded")
+public class PrismFeedReplicationUpdateTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    ColoHelper cluster3 = servers.get(2);
+    FileSystem cluster1FS = serverFS.get(0);
+    FileSystem cluster2FS = serverFS.get(1);
+    FileSystem cluster3FS = serverFS.get(2);
+    String cluster1Colo = cluster1.getClusterHelper().getColoName();
+    String cluster2Colo = cluster2.getClusterHelper().getColoName();
+    String cluster3Colo = cluster3.getClusterHelper().getColoName();
+    private final String baseTestDir = baseHDFSDir + "/PrismFeedReplicationUpdateTest";
+    private final String inputPath =
+        baseTestDir + "/input-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String alternativeInputPath =
+        baseTestDir + "/newFeedPath/input-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismFeedReplicationUpdateTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void prepareCluster() throws IOException {
+        // upload workflow to hdfs
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readELBundle();
+
+        for (int i = 0; i < 3; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Set feed cluster1 as target, clusters 2 and 3 as source. Run feed. Update feed and check
+     * if action succeed. Check that appropriate number of replication and retention coordinators
+     * exist on matching clusters.
+     *
+     * @throws Exception
+     */
+    @Test(enabled = true, timeOut = 1200000)
+    public void multipleSourceOneTarget() throws Exception {
+
+        bundles[0].setInputFeedDataPath(inputPath);
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        // use the colo string here so that the test works in embedded and distributed mode.
+        String postFix = "/US/" + cluster2Colo;
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+        HadoopUtil.lateDataReplenish(cluster2FS, 5, 80, prefix, postFix);
+
+        // use the colo string here so that the test works in embedded and distributed mode.
+        postFix = "/UK/" + cluster3Colo;
+        prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+        HadoopUtil.lateDataReplenish(cluster3FS, 5, 80, prefix, postFix);
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(-30);
+
+        feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime,
+            TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "US/${cluster.colo}");
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 105)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 130)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+
+        //change feed location path
+        feed = InstanceUtil.setFeedFilePath(feed, alternativeInputPath);
+
+        logger.info("updated feed: " + Util.prettyPrintXml(feed));
+
+        //update feed
+        AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed, feed));
+
+        Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(),
+            Util.readEntityName(feed),
+            "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(),
+            Util.readEntityName(feed),
+            "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(),
+            Util.readEntityName(feed),
+            "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(),
+            Util.readEntityName(feed),
+            "RETENTION"), 2);
+        Assert.assertEquals(
+            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 4);
+        Assert.assertEquals(
+            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 2);
+    }
+
+    /**
+     * Set feed1 to have cluster1 as source, cluster3 as target. Set feed2 clusters vise versa.
+     * Add both clusters to process and feed2 as input feed. Run process. Update feed1.
+     * TODO test case is incomplete
+     *
+     * @throws Exception
+     */
+    @Test(enabled = true, timeOut = 3600000)
+    public void updateFeed_dependentProcessTest() throws Exception {
+        //set cluster colos
+        bundles[0].setCLusterColo(cluster1Colo);
+        bundles[1].setCLusterColo(cluster2Colo);
+        bundles[2].setCLusterColo(cluster3Colo);
+
+        //submit 3 clusters
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        //get 2 unique feeds
+        String feed01 = bundles[0].getInputFeedFromBundle();
+        String feed02 = bundles[1].getInputFeedFromBundle();
+        String outputFeed = bundles[0].getOutputFeedFromBundle();
+
+        //set clusters to null;
+        feed01 = InstanceUtil.setFeedCluster(feed01,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed02 = InstanceUtil.setFeedCluster(feed02,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        outputFeed = InstanceUtil.setFeedCluster(outputFeed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        //set new feed input data
+        feed01 = Util.setFeedPathValue(feed01,
+            baseHDFSDir + "/feed01/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
+        feed02 = Util.setFeedPathValue(feed02,
+            baseHDFSDir + "/feed02/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
+
+        //generate data in both the colos ua1 and ua3
+        String prefix = InstanceUtil.getFeedPrefix(feed01);
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
+        HadoopUtil.lateDataReplenish(cluster1FS, 25, 1, prefix, null);
+
+        prefix = InstanceUtil.getFeedPrefix(feed02);
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+        HadoopUtil.lateDataReplenish(cluster3FS, 25, 1, prefix, null);
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(-50);
+
+        //set clusters for feed01
+        feed01 = InstanceUtil
+            .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+
+        feed01 = InstanceUtil
+            .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.TARGET,
+                null);
+
+        //set clusters for feed02
+        feed02 = InstanceUtil
+            .setFeedCluster(feed02, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+                null);
+
+        feed02 = InstanceUtil
+            .setFeedCluster(feed02, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+
+        //set clusters for output feed
+        outputFeed = InstanceUtil.setFeedCluster(outputFeed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null);
+
+        outputFeed = InstanceUtil.setFeedCluster(outputFeed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.TARGET, null);
+
+        //submit and schedule feeds
+        prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed01);
+        prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed02);
+        prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_URL, outputFeed);
+
+        //create a process with 2 clusters
+
+        //get a process
+        String process = bundles[0].getProcessData();
+
+        //add clusters to process
+        String processStartTime = TimeUtil.getTimeWrtSystemTime(-6);
+        String processEndTime = TimeUtil.getTimeWrtSystemTime(70);
+
+        process = InstanceUtil.setProcessCluster(process, null,
+            XmlUtil.createProcessValidity(startTime, "2099-01-01T00:00Z"));
+
+        process = InstanceUtil
+            .setProcessCluster(process, Util.readEntityName(bundles[0].getClusters().get(0)),
+                XmlUtil.createProcessValidity(processStartTime, processEndTime));
+
+        process = InstanceUtil
+            .setProcessCluster(process, Util.readEntityName(bundles[2].getClusters().get(0)),
+                XmlUtil.createProcessValidity(processStartTime, processEndTime));
+        process = InstanceUtil.addProcessInputFeed(process, Util.readEntityName(feed02),
+            Util.readEntityName(feed02));
+
+        //submit and schedule process
+        AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(URLS
+            .SUBMIT_AND_SCHEDULE_URL, process));
+
+        logger.info("Wait till process goes into running ");
+
+        int timeout = OSUtil.IS_WINDOWS ? 50 : 25;
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(process), 1,
+            Status.RUNNING, EntityType.PROCESS, timeout);
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(2), Util.getProcessName(process), 1,
+            Status.RUNNING, EntityType.PROCESS, timeout);
+
+        feed01 = InstanceUtil.setFeedFilePath(feed01, alternativeInputPath);
+        logger.info("updated feed: " + Util.prettyPrintXml(feed01));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed01, feed01));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java
new file mode 100644
index 0000000..4c82baf
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+@Test(groups = "distributed")
+public class PrismFeedResumeTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    private boolean restartRequired;
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedResumeTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismFeedResumeTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void testName(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLateDataBundle();
+
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (restartRequired) {
+            Util.restartService(cluster1.getFeedHelper());
+        }
+        removeBundles();
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeSuspendedFeedOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+        //suspend using prismHelper
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.SUSPENDED);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        //suspend using prismHelper
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.RUNNING);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        //try using the colohelper
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.SUSPENDED);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        //suspend using prismHelper
+        AssertUtil.assertSucceeded(
+            cluster1.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.RUNNING);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+
+        //suspend on the other one
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.SUSPENDED);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+            cluster1.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.RUNNING);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeDeletedFeedOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+        //delete using coloHelpers
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //suspend using prismHelper
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+        //suspend on the other one
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+        AssertUtil.assertFailed(
+            cluster1.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.assertFailed(
+            cluster2.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeResumedFeedOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.SUSPENDED);
+        for (int i = 0; i < 2; i++) {
+            //suspend using prismHelper
+            AssertUtil.assertSucceeded(prism.getFeedHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+            //verify
+            checkAndCompareStatus(cluster1, bundles[0], Job.Status.RUNNING);
+            checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        }
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.SUSPENDED);
+
+        for (int i = 0; i < 2; i++) {
+            AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+            //verify
+            checkAndCompareStatus(cluster1, bundles[0], Job.Status.RUNNING);
+            checkAndCompareStatus(cluster2, bundles[1], Job.Status.SUSPENDED);
+        }
+
+
+        for (int i = 0; i < 2; i++) {
+            //suspend on the other one
+            AssertUtil.assertSucceeded(prism.getFeedHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+            AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        }
+
+        for (int i = 0; i < 2; i++) {
+            //suspend on the other one
+            AssertUtil.assertSucceeded(cluster2.getFeedHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+            checkAndCompareStatus(cluster1, bundles[0], Job.Status.RUNNING);
+            checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        }
+    }
+
+    @Test
+    public void testResumeNonExistentFeedOnBothColos() throws Exception {
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+
+        AssertUtil.assertFailed(
+            cluster1.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            cluster2.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+    }
+
+    @Test
+    public void testResumeSubmittedFeedOnBothColos() throws Exception {
+        bundles[0].submitFeed();
+        bundles[1].submitFeed();
+
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+
+        AssertUtil.assertFailed(
+            cluster1.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            cluster2.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeScheduledFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(cluster2.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.SUSPENDED);
+        //resume on the other one
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+
+        Util.startService(cluster1.getFeedHelper());
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.assertSucceeded(
+            cluster2.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        checkAndCompareStatus(cluster1, bundles[0], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeDeletedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+        //delete using prismHelper
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        //suspend using prismHelper
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+
+        //suspend using prismHelper
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        checkAndCompareStatus(cluster2, bundles[1], Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+        //suspend on the other one
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+
+        AssertUtil.assertFailed(
+            cluster2.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeNonExistentFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            cluster2.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeSubmittedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+
+        bundles[0].submitFeed();
+        bundles[1].submitFeed();
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.assertFailed(
+            cluster2.getFeedHelper().resume(Util.URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+    }
+
+
+    private void checkAndCompareStatus(ColoHelper coloHelper, Bundle bundle,
+                                       Job.Status expectedStatus) throws Exception {
+        AssertUtil
+            .checkStatus(coloHelper.getFeedHelper().getOozieClient(), EntityType.FEED, bundle,
+                expectedStatus);
+        String entity = bundle.getDataSets().get(0);
+        Assert.assertEquals(
+            Util.parseResponse(prism.getFeedHelper().getStatus(Util.URLS.STATUS_URL, entity))
+                .getMessage(),
+            coloHelper.getFeedHelper().getColoName() + "/" + expectedStatus);
+        Assert.assertEquals(
+            Util.parseResponse(prism.getFeedHelper().getStatus(Util.URLS.STATUS_URL, entity))
+                .getMessage(),
+            coloHelper.getFeedHelper().getColoName() + "/"
+                + Util.parseResponse(
+                coloHelper.getFeedHelper().getStatus(Util.URLS.STATUS_URL, entity)).getMessage());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
new file mode 100644
index 0000000..dc82c39
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+@Test(groups = "embedded")
+public class PrismFeedScheduleTest extends BaseTestClass {
+
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedScheduleTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismFeedScheduleTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws IOException {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLateDataBundle();
+
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Run feed. Suspend it. Run another feed on another cluster. Check that 1st feed is
+     * suspended on 1st cluster and wnd feed is running on 2nd cluster and not criss cross.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testFeedScheduleOn1ColoWhileAnotherColoHasSuspendedFeed()
+        throws Exception {
+        logger.info("cluster: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+        logger.info("feed: " + Util.prettyPrintXml(bundles[0].getDataSets().get(0)));
+
+        bundles[0].submitAndScheduleFeed();
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        bundles[1].submitAndScheduleFeed();
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+}


Mime
View raw message