falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [18/27] adding falcon-regression
Date Mon, 04 Aug 2014 10:04:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
new file mode 100644
index 0000000..b06d8dd
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
@@ -0,0 +1,857 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ * Feed cluster update tests.
+ */
+@Test(groups = "distributed")
+public class FeedClusterUpdateTest extends BaseTestClass {
+
+    String baseTestDir = baseHDFSDir + "/FeedClusterUpdateTest";
+    String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    ColoHelper cluster3 = servers.get(2);
+    FileSystem cluster2FS = serverFS.get(1);
+    FileSystem cluster3FS = serverFS.get(2);
+    private String feed;
+    String startTime;
+    String feedOriginalSubmit;
+    String feedUpdated;
+    private static final Logger logger = Logger.getLogger(FeedClusterUpdateTest.class);
+
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        Bundle bundle = BundleUtil.readELBundle();
+        for (int i = 0; i < 3; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+        try {
+            String postFix = "/US/" + servers.get(1).getClusterHelper().getColoName();
+            HadoopUtil.deleteDirIfExists(baseTestDir, cluster2FS);
+            HadoopUtil.lateDataReplenish(cluster2FS, 80, 1, baseTestDir, postFix);
+            postFix = "/UK/" + servers.get(2).getClusterHelper().getColoName();
+            HadoopUtil.deleteDirIfExists(baseTestDir, cluster3FS);
+            HadoopUtil.lateDataReplenish(cluster3FS, 80, 1, baseTestDir, postFix);
+        } finally {
+            removeBundles();
+        }
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+
+        Bundle bundle = BundleUtil.readELBundle();
+        for (int i = 0; i < 3; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+        BundleUtil.submitAllClusters(prism, bundles[0], bundles[1], bundles[2]);
+        feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+        startTime = TimeUtil.getTimeWrtSystemTime(-50);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(enabled = false, groups = {"multiCluster"})
+    public void addSourceCluster() throws Exception {
+        //add one source and one target , schedule only on source
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                TimeUtil.addMinsToTime(startTime, 65)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            null);
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feedOriginalSubmit,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
+
+        ServiceResponse response =
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(response);
+
+        //schedule on source
+        response = cluster2.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit), "RETENTION" +
+                    ""), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "RETENTION"), 0);
+
+        //prepare updated Feed
+        feedUpdated = InstanceUtil.setFeedCluster(
+            feed, XmlUtil.createValidity(startTime,
+                TimeUtil.addMinsToTime(startTime, 65)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "US/${cluster.colo}");
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedUpdated);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 2);
+    }
+
+    @Test(enabled = false, groups = {"multiCluster"})
+    public void addTargetCluster() throws Exception {
+        //add one source and one target , schedule only on source
+        feedOriginalSubmit = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feedOriginalSubmit,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
+
+        ServiceResponse response =
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(response);
+
+        //schedule on source
+        response = cluster2.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "RETENTION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            0);
+
+        //prepare updated Feed
+        feedUpdated = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                "US/${cluster.colo}");
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
+
+        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedUpdated);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+    }
+
+    @Test(enabled = false, groups = {"multiCluster"})
+    public void add2SourceCluster() throws Exception {
+        //add one source and one target , schedule only on source
+        feedOriginalSubmit = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
+
+        ServiceResponse response =
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(response);
+
+        //schedule on source
+        response = cluster2.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            0);
+
+        //prepare updated Feed
+        feedUpdated = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                "US/${cluster.colo}");
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null);
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
+
+        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedUpdated);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+    }
+
+    @Test(enabled = false, groups = {"multiCluster"})
+    public void add2TargetCluster() throws Exception {
+        //add one source and one target , schedule only on source
+        feedOriginalSubmit = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
+
+        ServiceResponse response =
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(response);
+
+        //schedule on source
+
+        response = cluster2.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            0);
+
+        //prepare updated Feed
+        feedUpdated = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                TimeUtil.addMinsToTime(startTime, 65)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            null);
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.TARGET, null);
+
+        logger.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
+
+        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedUpdated);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+    }
+
+    @Test(enabled = false, groups = {"multiCluster"})
+    public void add1Source1TargetCluster() throws Exception {
+        //add one source and one target , schedule only on source
+        feedOriginalSubmit = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
+
+        ServiceResponse response =
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(response);
+
+        //schedule on source
+        response = cluster2.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "RETENTION"), 0);
+
+        //prepare updated Feed
+        feedUpdated = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                "US/${cluster.colo}");
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
+
+        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedUpdated);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+    }
+
+    @Test(enabled = false, groups = {"multiCluster"})
+    public void deleteSourceCluster() throws Exception {
+        //add one source and one target , schedule only on source
+        feedOriginalSubmit = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                "US/${cluster.colo}");
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feedOriginalSubmit,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feedOriginalSubmit,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
+
+        ServiceResponse response =
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(response);
+
+        //schedule on source
+
+        response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit), "RETENTION" +
+                    ""), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "RETENTION"), 1);
+
+        //prepare updated Feed
+        feedUpdated = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+
+        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        response =
+            cluster3.getFeedHelper().getEntityDefinition(URLS.GET_ENTITY_DEFINITION, feedUpdated);
+        AssertUtil.assertFailed(response);
+
+        prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedUpdated);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 0);
+    }
+
+    @Test(enabled = true, groups = {"multiCluster"})
+    public void deleteTargetCluster() throws Exception {
+
+        /*
+        this test creates a multiCluster feed. Cluster1 is the target cluster
+         and cluster3 and Cluster2 are the source cluster.
+
+        feed is submitted through prism so submitted to both target and
+        source. Feed is scheduled through prism, so only on Cluster3 and
+        Cluster2 retention coord should exists. Cluster1 one which
+         is target both retention and replication coord should exists. there
+         will be 2 replication coord, one each for each source cluster.
+
+        then we update feed by deleting cluster1 and cluster2 from the feed
+        xml and send update request.
+
+        Once update is over. definition should go missing from cluster1 and
+        cluster2 and prism and cluster3 should have new def
+
+        there should be a new retention coord on cluster3 and old number of
+        coord on cluster1 and cluster2
+         */
+
+        //add two source and one target
+
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feedOriginalSubmit = InstanceUtil
+            .setFeedCluster(feedOriginalSubmit, XmlUtil.createValidity(startTime,
+                    TimeUtil.addMinsToTime(startTime, 65)),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)),
+                ClusterType.SOURCE,
+                "US/${cluster.colo}");
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feedOriginalSubmit,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)),
+            ClusterType.TARGET, null);
+        feedOriginalSubmit = InstanceUtil.setFeedCluster(feedOriginalSubmit,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)),
+            ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
+
+        ServiceResponse response =
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOriginalSubmit);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(response);
+
+        //schedule on source
+        response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL,
+            feedOriginalSubmit);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster3.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                Util.readEntityName(feedOriginalSubmit),
+                "REPLICATION"), 2);
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster1.getFeedHelper(),
+                    Util.readEntityName(feedOriginalSubmit), "RETENTION"),
+            1);
+
+        //prepare updated Feed
+
+        feedUpdated = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feedUpdated = InstanceUtil.setFeedCluster(feedUpdated,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)),
+            ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+        logger.info("Feed: " + Util.prettyPrintXml(feedUpdated));
+
+        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
+        TimeUtil.sleepSeconds(20);
+        AssertUtil.assertSucceeded(response);
+
+
+        //verify xmls definitions
+        response =
+            cluster1.getFeedHelper().getEntityDefinition(URLS.GET_ENTITY_DEFINITION, feedUpdated);
+        AssertUtil.assertFailed(response);
+        response = cluster2.getFeedHelper().getEntityDefinition(URLS
+            .GET_ENTITY_DEFINITION, feedUpdated);
+        AssertUtil.assertFailed(response);
+        response = cluster3.getFeedHelper().getEntityDefinition(URLS
+            .GET_ENTITY_DEFINITION, feedUpdated);
+        Assert.assertTrue(XmlUtil.isIdentical(feedUpdated,
+            response.getMessage()));
+        response = prism.getFeedHelper().getEntityDefinition(URLS
+            .GET_ENTITY_DEFINITION, feedUpdated);
+        Assert.assertTrue(XmlUtil.isIdentical(feedUpdated,
+            response.getMessage()));
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 0);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "REPLICATION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feedUpdated),
+                "RETENTION"), 1);
+    }
+
+    /*
+    @Test(enabled = false)
+    public void delete2SourceCluster() {
+
+    }
+
+    @Test(enabled = false)
+    public void delete2TargetCluster() {
+
+    }
+
+    @Test(enabled = false)
+    public void delete1Source1TargetCluster() {
+
+    }
+    */
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
new file mode 100644
index 0000000..ff227d6
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+
+/**
+ * Feed instance status tests.
+ */
+@Test(groups = "embedded")
+public class FeedInstanceStatusTest extends BaseTestClass {
+
+    private String baseTestDir = baseHDFSDir + "/FeedInstanceStatusTest";
+    private String feedInputPath = baseTestDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+
+    ColoHelper cluster2 = servers.get(1);
+    ColoHelper cluster3 = servers.get(2);
+    FileSystem cluster2FS = serverFS.get(1);
+    FileSystem cluster3FS = serverFS.get(2);
+    private static final Logger logger = Logger.getLogger(FeedInstanceStatusTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void testName(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readELBundle();
+        for (int i = 0; i < 3; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void feedInstanceStatus_running() throws Exception {
+        bundles[0].setInputFeedDataPath(feedInputPath);
+
+        logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+        ServiceResponse r = prism.getClusterHelper()
+            .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+        r = prism.getClusterHelper()
+            .submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        logger.info("cluster bundle3: " + Util.prettyPrintXml(bundles[2].getClusters().get(0)));
+        r = prism.getClusterHelper()
+            .submitEntity(URLS.SUBMIT_URL, bundles[2].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+        String startTime = TimeUtil.getTimeWrtSystemTime(-50);
+
+        feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime,
+                TimeUtil.addMinsToTime(startTime, 65)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "US/${cluster.colo}");
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 20),
+                TimeUtil.addMinsToTime(startTime, 85)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(TimeUtil.addMinsToTime(startTime, 40),
+                TimeUtil.addMinsToTime(startTime, 110)),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        //status before submit
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 100) + "&end=" +
+                TimeUtil.addMinsToTime(startTime, 120));
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed));
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed),
+                "?start=" + startTime + "&end=" + TimeUtil
+                    .addMinsToTime(startTime, 100));
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+
+        // both replication instances
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed),
+                "?start=" + startTime + "&end=" + TimeUtil
+                    .addMinsToTime(startTime, 100));
+
+        // single instance at -30
+        prism.getFeedHelper().getProcessInstanceStatus(Util.readEntityName(feed),
+            "?start=" + TimeUtil
+                .addMinsToTime(startTime, 20));
+
+        //single at -10
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 40));
+
+        //single at 10
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 40));
+
+        //single at 30
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 40));
+
+        String postFix = "/US/" + cluster2.getClusterHelper().getColo();
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+        HadoopUtil.lateDataReplenish(cluster2FS, 80, 20, prefix, postFix);
+
+        postFix = "/UK/" + cluster3.getClusterHelper().getColo();
+        prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+        HadoopUtil.lateDataReplenish(cluster3FS, 80, 20, prefix, postFix);
+
+        // both replication instances
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed),
+                "?start=" + startTime + "&end=" + TimeUtil
+                    .addMinsToTime(startTime, 100));
+
+        // single instance at -30
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 20));
+
+        //single at -10
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 40));
+
+        //single at 10
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 40));
+
+        //single at 30
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 40));
+
+        logger.info("Wait till feed goes into running ");
+
+        //suspend instances -10
+        prism.getFeedHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 40));
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 20) + "&end=" +
+                TimeUtil.addMinsToTime(startTime, 40));
+
+        //resuspend -10 and suspend -30 source specific
+        prism.getFeedHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(feed),
+                "?start=" + TimeUtil
+                    .addMinsToTime(startTime, 20) + "&end=" +
+                    TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 20) + "&end=" +
+                TimeUtil.addMinsToTime(startTime, 40));
+
+        //resume -10 and -30
+        prism.getFeedHelper()
+            .getProcessInstanceResume(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 20) + "&end=" +
+                TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 20) + "&end=" +
+                TimeUtil.addMinsToTime(startTime, 40));
+
+        //get running instances
+        prism.getFeedHelper().getRunningInstance(URLS.INSTANCE_RUNNING, Util.readEntityName(feed));
+
+        //rerun succeeded instance
+        prism.getFeedHelper()
+            .getProcessInstanceRerun(Util.readEntityName(feed), "?start=" + startTime);
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed),
+                "?start=" + startTime + "&end=" + TimeUtil
+                    .addMinsToTime(startTime, 20));
+
+        //kill instance
+        prism.getFeedHelper()
+            .getProcessInstanceKill(Util.readEntityName(feed), "?start=" + TimeUtil
+                .addMinsToTime(startTime, 44));
+        prism.getFeedHelper()
+            .getProcessInstanceKill(Util.readEntityName(feed), "?start=" + startTime);
+
+        //end time should be less than end of validity i.e startTime + 110
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed),
+                "?start=" + startTime + "&end=" + TimeUtil
+                    .addMinsToTime(startTime, 110));
+
+
+        //rerun killed instance
+        prism.getFeedHelper()
+            .getProcessInstanceRerun(Util.readEntityName(feed), "?start=" + startTime);
+        prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed),
+                "?start=" + startTime + "&end=" + TimeUtil
+                    .addMinsToTime(startTime, 110));
+
+        //kill feed
+        prism.getFeedHelper().delete(URLS.DELETE_URL, feed);
+        InstancesResult responseInstance = prism.getFeedHelper()
+            .getProcessInstanceStatus(Util.readEntityName(feed),
+                "?start=" + startTime + "&end=" + TimeUtil
+                    .addMinsToTime(startTime, 110));
+
+        logger.info(responseInstance.getMessage());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
new file mode 100644
index 0000000..c524324
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * feed replication test.
+ */
+@Test(groups = "embedded")
+public class FeedReplicationTest extends BaseTestClass {
+
+    private ColoHelper cluster1 = servers.get(0);
+    private ColoHelper cluster2 = servers.get(1);
+    private ColoHelper cluster3 = servers.get(2);
+    private FileSystem cluster1FS = serverFS.get(0);
+    private FileSystem cluster2FS = serverFS.get(1);
+    private FileSystem cluster3FS = serverFS.get(2);
+    private OozieClient cluster2OC = serverOC.get(1);
+    private OozieClient cluster3OC = serverOC.get(2);
+    private String dateTemplate = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String baseTestDir = baseHDFSDir + "/FeedReplicationTest";
+    private String sourcePath = baseTestDir + "/source";
+    private String feedDataLocation = baseTestDir + "/source" + dateTemplate;
+    private String targetPath = baseTestDir + "/target";
+    private String targetDataLocation = targetPath + dateTemplate;
+    private static final Logger LOGGER = Logger.getLogger(FeedReplicationTest.class);
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws JAXBException, IOException {
+        LOGGER.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLocalDCBundle();
+
+        bundles[0] = new Bundle(bundle, cluster1);
+        bundles[1] = new Bundle(bundle, cluster2);
+        bundles[2] = new Bundle(bundle, cluster3);
+
+        bundles[0].generateUniqueBundle();
+        bundles[1].generateUniqueBundle();
+        bundles[2].generateUniqueBundle();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Test demonstrates replication of stored data from one source cluster to one target cluster.
+     * It checks the lifecycle of replication workflow instance including its creation. When
+     * replication ends test checks if data was replicated correctly.
+     */
+    @Test
+    public void replicate1Source1Target()
+        throws AuthenticationException, IOException, URISyntaxException, JAXBException,
+        OozieClientException {
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+
+        //configure feed
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        //erase all clusters from feed definition
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+        //set cluster1 as source
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)),
+            ClusterType.SOURCE, null);
+        //set cluster2 as target
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)),
+            ClusterType.TARGET, null, targetDataLocation);
+
+        //submit and schedule feed
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+                feed));
+
+        //upload necessary data
+        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
+        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
+        String timePattern = fmt.print(date);
+        String sourceLocation = sourcePath + "/" + timePattern + "/";
+        String targetLocation = targetPath + "/" + timePattern + "/";
+        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
+
+        Path toSource = new Path(sourceLocation);
+        Path toTarget = new Path(targetLocation);
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
+            OSUtil.RESOURCES + "feed-s4Replication.xml");
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt");
+
+        //check if coordinator exists
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+
+        //replication should start, wait while it ends
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        //check if data has been replicated correctly
+        List<Path> cluster1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
+        List<Path> cluster2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
+
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+    }
+
+    /**
+     * Test demonstrates replication of stored data from one source cluster to two target clusters.
+     * It checks the lifecycle of replication workflow instances including their creation on both
+     * targets. When replication ends test checks if data was replicated correctly.
+     */
+    @Test
+    public void replicate1Source2Targets() throws Exception {
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+
+        //configure feed
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        //erase all clusters from feed definition
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+        //set cluster1 as source
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)),
+            ClusterType.SOURCE, null);
+        //set cluster2 as target
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)),
+            ClusterType.TARGET, null, targetDataLocation);
+        //set cluster3 as target
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)),
+            ClusterType.TARGET, null, targetDataLocation);
+
+        //submit and schedule feed
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+                feed));
+
+        //upload necessary data
+        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
+        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
+        String timePattern = fmt.print(date);
+        String sourceLocation = sourcePath + "/" + timePattern + "/";
+        String targetLocation = targetPath + "/" + timePattern + "/";
+        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
+
+        Path toSource = new Path(sourceLocation);
+        Path toTarget = new Path(targetLocation);
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
+            OSUtil.RESOURCES + "feed-s4Replication.xml");
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt");
+
+        //check if all coordinators exist
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, feed, 0);
+
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+        //replication on cluster 2 should start, wait till it ends
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        //replication on cluster 3 should start, wait till it ends
+        InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        //check if data has been replicated correctly
+        List<Path> cluster1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
+        List<Path> cluster2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
+        List<Path> cluster3ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, toTarget);
+
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster3ReplicatedData);
+    }
+
+    /**
+     * Test demonstrates how replication depends on availability flag. Scenario includes one
+     * source and one target cluster. When feed is submitted and scheduled and data is available,
+     * feed still waits for availability flag (file which name is defined as availability flag in
+     * feed definition). As soon as mentioned file is got uploaded in data directory,
+     * replication starts and when it ends test checks if data was replicated correctly.
+     */
+    @Test
+    public void availabilityFlagTest() throws Exception {
+        //replicate1Source1Target scenario + set availability flag but don't upload required file
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+
+        //configure feed
+        String availabilityFlagName = "README.md";
+        String feedName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        Feed feedElement = bundles[0].getFeedElement(feedName);
+        feedElement.setAvailabilityFlag(availabilityFlagName);
+        bundles[0].writeFeedElement(feedElement, feedName);
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        //erase all clusters from feed definition
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+        //set cluster1 as source
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)),
+            ClusterType.SOURCE, null);
+        //set cluster2 as target
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)),
+            ClusterType.TARGET, null, targetDataLocation);
+
+        //submit and schedule feed
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+                feed));
+
+        //upload necessary data
+        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
+        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
+        String timePattern = fmt.print(date);
+        String sourceLocation = sourcePath + "/" + timePattern + "/";
+        String targetLocation = targetPath + "/" + timePattern + "/";
+        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
+
+        Path toSource = new Path(sourceLocation);
+        Path toTarget = new Path(targetLocation);
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
+            OSUtil.RESOURCES + "feed-s4Replication.xml");
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt");
+
+        //check while instance is got created
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+
+        //check if coordinator exists
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), feedName, "REPLICATION"), 1);
+
+        //replication should not start even after time
+        TimeUtil.sleepSeconds(60);
+        InstancesResult r = prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + startTime + "&end=" + endTime);
+        InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
+        LOGGER.info("Replication didn't start.");
+
+        //create availability flag on source
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, availabilityFlagName);
+
+        //check if instance become running
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.RUNNING, EntityType.FEED);
+
+        //wait till instance succeed
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        //check if data was replicated correctly
+        List<Path> cluster1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
+        LOGGER.info("Data on source cluster: " + cluster1ReplicatedData);
+        List<Path> cluster2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
+        LOGGER.info("Data on target cluster: " + cluster2ReplicatedData);
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
new file mode 100644
index 0000000..0af3a4e
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ * Feed resume tests.
+ */
+@Test(groups = "embedded")
+public class FeedResumeTest extends BaseTestClass {
+
+    private final IEntityManagerHelper feedHelper = prism.getFeedHelper();
+    private String feed;
+    private ColoHelper cluster = servers.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private String aggregateWorkflowDir = baseHDFSDir + "/FeedResumeTest/aggregator";
+    private static final Logger LOGGER = Logger.getLogger(FeedResumeTest.class);
+
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0].generateUniqueBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].submitClusters(prism);
+        feed = bundles[0].getInputFeedFromBundle();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Launches feed, suspends it and then resumes and checks if it got running.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void resumeSuspendedFeed() throws Exception {
+        AssertUtil
+            .assertSucceeded(feedHelper.submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.assertSucceeded(feedHelper.suspend(URLS.SUSPEND_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+        AssertUtil.assertSucceeded(feedHelper.resume(URLS.RESUME_URL, feed));
+        ServiceResponse response = feedHelper.getStatus(URLS.STATUS_URL, feed);
+        String colo = feedHelper.getColo();
+        Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+    }
+
+
+    /**
+     * Tries to resume feed that wasn't submitted and scheduled. Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void resumeNonExistentFeed() throws Exception {
+        AssertUtil.assertFailed(feedHelper.resume(URLS.RESUME_URL, feed));
+    }
+
+    /**
+     * Tries to resume deleted feed. Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void resumeDeletedFeed() throws Exception {
+        AssertUtil
+            .assertSucceeded(feedHelper.submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.assertSucceeded(feedHelper.delete(URLS.DELETE_URL, feed));
+        AssertUtil.assertFailed(feedHelper.resume(URLS.RESUME_URL, feed));
+    }
+
+    /**
+     * Tries to resume scheduled feed which wasn't suspended. Feed status shouldn't change.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void resumeScheduledFeed() throws Exception {
+        AssertUtil
+            .assertSucceeded(feedHelper.submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(feedHelper.resume(URLS.RESUME_URL, feed));
+        ServiceResponse response = feedHelper.getStatus(URLS.STATUS_URL, feed);
+        String colo = feedHelper.getColo();
+        Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
new file mode 100644
index 0000000..148e3bf
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ * Feed schedule tests.
+ */
+@Test(groups = "embedded")
+public class FeedScheduleTest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private String feed;
+    private String aggregateWorkflowDir = baseHDFSDir + "/FeedScheduleTest/aggregator";
+    private static final Logger LOGGER = Logger.getLogger(FeedScheduleTest.class);
+
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        Bundle.submitCluster(bundles[0]);
+        feed = bundles[0].getInputFeedFromBundle();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Tries to schedule already scheduled feed. Request should be considered as correct.
+     * Feed status shouldn't change.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void scheduleAlreadyScheduledFeed() throws Exception {
+        ServiceResponse response = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        AssertUtil.assertSucceeded(response);
+
+        response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(response);
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+
+        //now try re-scheduling again
+        response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(response);
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+    }
+
+    /**
+     * Schedule correct feed. Feed should got running.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void scheduleValidFeed() throws Exception {
+        //submit feed
+        ServiceResponse response = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        AssertUtil.assertSucceeded(response);
+
+        //now schedule the thing
+        response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(response);
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+    }
+
+    /**
+     * Tries to schedule already scheduled and suspended feed. Suspended status shouldn't change.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void scheduleSuspendedFeed() throws Exception {
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+
+        //now suspend
+        AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+        //now schedule this!
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+    }
+
+    /**
+     * Schedules and deletes feed. Tries to schedule it. Request should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void scheduleKilledFeed() throws Exception {
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+
+        //now suspend
+        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
+        //now schedule this!
+        AssertUtil.assertFailed(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+    }
+
+    /**
+     * Tries to schedule feed which wasn't submitted. Request should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void scheduleNonExistentFeed() throws Exception {
+        AssertUtil.assertFailed(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
+    }
+}


Mime
View raw message