falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [08/27] adding falcon-regression
Date Mon, 04 Aug 2014 10:04:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
new file mode 100644
index 0000000..e9a9cf4
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.APIResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismFeedSnSTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    private boolean restartRequired;
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSnSTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismFeedSnSTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        restartRequired = false;
+        Bundle bundle = BundleUtil.readELBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (restartRequired) {
+            Util.restartService(cluster1.getFeedHelper());
+        }
+        removeBundles();
+    }
+
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testFeedSnSOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleFeed();
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        bundles[1].submitAndScheduleFeed();
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testSnSAlreadyScheduledFeedOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        //ensure only one bundle is there
+        Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
+            Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+        Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
+            Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+    }
+
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSnSSuspendedFeedOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        //now check if they have been scheduled correctly or not
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
+            Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+            .resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+        Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
+            Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
+        AssertUtil.assertSucceeded(cluster2.getFeedHelper()
+            .resume(URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testSnSDeletedFeedOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testScheduleNonExistentFeedOnBothColos() throws Exception {
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testFeedSnSOn1ColoWhileOtherColoIsDown() throws Exception {
+        restartRequired = true;
+        for (String cluster : bundles[1].getClusters()) {
+            AssertUtil
+                .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
+        }
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+    }
+
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testFeedSnSOn1ColoWhileThatColoIsDown() throws Exception {
+        restartRequired = true;
+        bundles[0].submitFeed();
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeed() throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        bundles[1].submitAndScheduleFeed();
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeed() throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        bundles[1].submitAndScheduleFeed();
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testFeedSnSOnBothColosUsingColoHelper() throws Exception {
+        //schedule both bundles
+        bundles[0].submitFeed();
+        APIResult result = Util.parseResponse((cluster1.getFeedHelper()
+            .submitEntity(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0))));
+        Assert.assertEquals(result.getStatusCode(), 404);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+        bundles[1].submitFeed();
+        result = Util.parseResponse(cluster2.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+        Assert.assertEquals(result.getStatusCode(), 404);
+
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+    }
+
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSnSSuspendedFeedOnBothColosUsingColoHelper() throws Exception {
+
+        //schedule both bundles
+        bundles[0].submitFeed();
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        bundles[1].submitFeed();
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        //now check if they have been scheduled correctly or not
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.assertSucceeded(
+            cluster1.getFeedHelper().resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+
+        AssertUtil.assertSucceeded(cluster2.getFeedHelper().suspend(URLS.SUSPEND_URL,
+            bundles[1].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+    }
+
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testScheduleDeletedFeedOnBothColosUsingColoHelper() throws Exception {
+
+        //schedule both bundles
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+        Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
+                .getStatus(URLS.STATUS_URL, bundles[0].getDataSets().get(0))).getMessage(),
+            cluster1.getClusterHelper().getColoName() + "/RUNNING");
+
+        Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
+                .getStatus(URLS.STATUS_URL, bundles[1].getDataSets().get(0))).getMessage(),
+            cluster2.getClusterHelper().getColoName() + "/RUNNING");
+    }
+
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSNSNonExistentFeedOnBothColosUsingColoHelper() throws Exception {
+
+        Assert.assertEquals(Util.parseResponse(cluster1.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)))
+            .getStatusCode(), 404);
+        Assert.assertEquals(Util.parseResponse(cluster2.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)))
+            .getStatusCode(), 404);
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testFeedSnSOn1ColoWhileOtherColoIsDownUsingColoHelper() throws Exception {
+        restartRequired = true;
+        for (String cluster : bundles[1].getClusters()) {
+            AssertUtil
+                .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
+        }
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+    }
+
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testFeedSnSOn1ColoWhileThatColoIsDownUsingColoHelper() throws Exception {
+        restartRequired = true;
+
+        bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
+        logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
+        logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        String startTimeUA1 = "2012-10-01T12:00Z";
+        String startTimeUA2 = "2012-10-01T12:00Z";
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+                "${cluster.colo}",
+                baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                baseHDFSDir +
+                    "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        ServiceResponse response = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        AssertUtil.assertPartial(response);
+        response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertPartial(response);
+        Util.startService(cluster1.getFeedHelper());
+        prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+        prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[1].getClusters().get(0));
+
+    }
+
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeedUsingColoHelper()
+        throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        AssertUtil.assertSucceeded(
+            cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+
+        bundles[1].submitAndScheduleFeed();
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeedUsingColoHelper() throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        bundles[1].submitAndScheduleFeed();
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java
new file mode 100644
index 0000000..d6dbaa5
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismFeedSuspendTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSuspendTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismFeedSuspendTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readELBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Run two feed on different clusters. Delete 1st feed and try to suspend it. Should fail.
+     * Check that 2nd feed is running on 2nd cluster. Delete it and try to suspend it too.
+     * Attempt should fail and both feeds should be killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testSuspendDeletedFeedOnBothColos() throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        //delete using prism
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        //suspend using prism
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+        //suspend on the other one
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+    }
+
+    /**
+     * Run two feeds on different clusters. Suspend feed and try to suspend it once more. Check
+     * that action succeeds and feed is suspended. Make the same for 2nd feed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendSuspendedFeedOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+        for (int i = 0; i < 2; i++) {
+            //suspend using prism
+            AssertUtil.assertSucceeded(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+            );
+            //verify
+            AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+        }
+        for (int i = 0; i < 2; i++) {
+            //suspend on the other one
+            AssertUtil.assertSucceeded(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0))
+            );
+            AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+        }
+    }
+
+    /**
+     * Attempt to suspend nonexistent feed should fail through both prism and matching server.
+     *
+     * @throws Exception
+     */
+    @Test(groups = "embedded")
+    public void testSuspendNonExistentFeedOnBothColos() throws Exception {
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+        AssertUtil.assertFailed(cluster1.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(cluster2.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+    }
+
+    /**
+     * Attempt to suspend non-running feed should fail through both prism and matching server.
+     *
+     * @throws Exception
+     */
+    @Test(groups = "embedded")
+    public void testSuspendSubmittedFeedOnBothColos() throws Exception {
+        bundles[0].submitFeed();
+        bundles[1].submitFeed();
+
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(prism.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+        AssertUtil.assertFailed(cluster1.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertFailed(cluster2.getFeedHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+    }
+
+    /**
+     * Run two feeds on different clusters. Stop server on 1st cluster. Attempt to suspend feed on
+     * stopped server through prism should fail. Check that 2nd feed is running. Suspend it
+     * and check that it is suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendScheduledFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        try {
+            //schedule using colohelpers
+            bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+            bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+            Util.shutDownService(cluster1.getFeedHelper());
+
+            //suspend using prism
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+            );
+            //verify
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+            //suspend on the other one
+            AssertUtil.assertSucceeded(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster1.getFeedHelper());
+        }
+    }
+
+    /**
+     * Run two feeds on different clusters. Delete 1st feed. Stop server on 1st cluster. Attempt
+     * to suspend deleted feed on stopped server should fail. Delete 2nd feed. Attempt
+     * to suspend deleted 2nd feed should also fail. Check that both feeds are killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendDeletedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        try {
+            //schedule using colohelpers
+            bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+            bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+            //delete using coloHelpers
+            AssertUtil.assertSucceeded(
+                prism.getFeedHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0))
+            );
+            Util.shutDownService(cluster1.getFeedHelper());
+
+            //suspend using prism
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+            //verify
+            AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+            AssertUtil.assertSucceeded(
+                prism.getFeedHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0))
+            );
+            //suspend on the other one
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0))
+            );
+            AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster1.getFeedHelper());
+        }
+    }
+
+    /**
+     * Run two feeds on different clusters. Suspend 1st feed and check that it suspended,
+     * and then stop server on its cluster. Attempt to suspend the same feed again should fail.
+     * Suspend 2nd feed and check that both feeds are suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendSuspendedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        try {
+            //schedule using colohelpers
+            bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+            bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+            //suspend using prism
+            AssertUtil.assertSucceeded(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+            );
+            //verify
+            AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+            Util.shutDownService(cluster1.getFeedHelper());
+
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+            );
+            //suspend on the other one
+            AssertUtil.assertSucceeded(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+            AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster1.getProcessHelper());
+        }
+    }
+
+    /**
+     * Stop the 1st server. Attempt to suspend nonexistent feeds on both clusters should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendNonExistentFeedOnBothColosWhen1ColoIsDown()
+        throws Exception {
+        try {
+            Util.shutDownService(cluster1.getFeedHelper());
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0))
+            );
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+            AssertUtil.assertFailed(
+                cluster2.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster1.getProcessHelper());
+        }
+    }
+
+    /**
+     * Submit two feeds. Stop the server on the 1st cluster. Attempts to suspend non-scheduled
+     * feeds on both clusters should fail through prism as well as through colohelper.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendSubmittedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        try {
+            bundles[0].submitFeed();
+            bundles[1].submitFeed();
+
+            Util.shutDownService(cluster1.getFeedHelper());
+
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+            );
+            AssertUtil.assertFailed(
+                prism.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+            AssertUtil.assertFailed(
+                cluster2.getFeedHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster1.getProcessHelper());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
new file mode 100644
index 0000000..b6bf6d6
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+@Test(groups = "embedded")
+public class PrismFeedUpdateTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    FileSystem server1FS = serverFS.get(0);
+    OozieClient OC1 = serverOC.get(0);
+    String baseTestDir = baseHDFSDir + "/PrismFeedUpdateTest";
+    String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    public final String cluster1colo = cluster1.getClusterHelper().getColoName();
+    public final String cluster2colo = cluster2.getClusterHelper().getColoName();
+    private static final Logger logger = Logger.getLogger(PrismFeedUpdateTest.class);
+    String feedInputTimedOutPath =
+        baseTestDir + "/timedout/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readELBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+            bundles[i].setInputFeedDataPath(feedInputTimedOutPath);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * TODO : complete test case
+     */
+    @Test(enabled = true, timeOut = 1200000)
+    public void updateFeedQueue_dependentMultipleProcess_oneProcessZeroInput() throws Exception {
+        //cluster1colo and cluster2colo are source. feed01 on cluster1colo target cluster2colo,
+        // feed02 on cluster2colo target cluster1colo
+
+        //get 3 unique bundles
+        //set cluster colos
+        bundles[0].setCLusterColo(cluster1colo);
+        logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+        bundles[1].setCLusterColo(cluster2colo);
+        logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+
+        //submit 3 clusters
+
+        //get 2 unique feeds
+        String feed01 = bundles[0].getInputFeedFromBundle();
+        String outputFeed = bundles[0].getOutputFeedFromBundle();
+
+        //set source and target for the 2 feeds
+
+        //set clusters to null;
+        feed01 = InstanceUtil
+            .setFeedCluster(feed01,
+                XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+                ClusterType.SOURCE, null);
+        outputFeed = InstanceUtil
+            .setFeedCluster(outputFeed,
+                XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+                ClusterType.SOURCE, null);
+
+
+        //set new feed input data
+        feed01 = Util.setFeedPathValue(feed01,
+            baseTestDir + "/feed01/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
+
+
+        //generate data in both the colos cluster1colo and cluster2colo
+        String prefix = InstanceUtil.getFeedPrefix(feed01);
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS);
+        HadoopUtil.lateDataReplenish(server1FS, 70, 1, prefix, null);
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(-50);
+
+        //set clusters for feed01
+        feed01 = InstanceUtil
+            .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+                null);
+        feed01 = InstanceUtil
+            .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
+                null);
+
+        //set clusters for output feed
+        outputFeed = InstanceUtil.setFeedCluster(outputFeed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null);
+        outputFeed = InstanceUtil.setFeedCluster(outputFeed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null);
+
+
+        //submit and schedule feeds
+        logger.info("feed01: " + Util.prettyPrintXml(feed01));
+        logger.info("outputFeed: " + Util.prettyPrintXml(outputFeed));
+
+        //create 2 process with 2 clusters
+
+        //get first process
+        String process01 = bundles[0].getProcessData();
+
+        //add clusters to process
+
+        String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
+        String processEndTime = TimeUtil.getTimeWrtSystemTime(70);
+
+
+        process01 = InstanceUtil
+            .setProcessCluster(process01, null,
+                XmlUtil.createProcessValidity(startTime, "2099-01-01T00:00Z"));
+        process01 = InstanceUtil
+            .setProcessCluster(process01, Util.readEntityName(bundles[0].getClusters().get(0)),
+                XmlUtil.createProcessValidity(processStartTime, processEndTime));
+        process01 = InstanceUtil
+            .setProcessCluster(process01, Util.readEntityName(bundles[1].getClusters().get(0)),
+                XmlUtil.createProcessValidity(processStartTime, processEndTime));
+
+        //get 2nd process :
+        String process02 = process01;
+        process02 = InstanceUtil
+            .setProcessName(process02, "zeroInputProcess" + new Random().nextInt());
+        List<String> feed = new ArrayList<String>();
+        feed.add(outputFeed);
+        final ProcessMerlin processMerlin = new ProcessMerlin(process02);
+        processMerlin.setProcessFeeds(feed, 0, 0, 1);
+        process02 = processMerlin.toString();
+
+
+        //submit and schedule both process
+        logger.info("process: " + Util.prettyPrintXml(process01));
+        logger.info("process: " + Util.prettyPrintXml(process02));
+
+
+        logger.info("Wait till process goes into running ");
+
+        //change feed location path
+        outputFeed = Util.setFeedProperty(outputFeed, "queueName", "myQueue");
+
+        logger.info("updated feed: " + Util.prettyPrintXml(outputFeed));
+
+        //update feed first time
+        prism.getFeedHelper().update(outputFeed, outputFeed);
+    }
+
+
+    /**
+     * schedules a feed and dependent process. Process start and end are in past
+     * Test for bug https://issues.apache.org/jira/browse/FALCON-500
+     */
+    @Test
+    public void dependentProcessSucceeded()
+        throws Exception {
+        bundles[0].setProcessValidity("2014-06-01T04:00Z","2014-06-01T04:02Z");
+        bundles[0].submitAndScheduleAllFeeds();
+        bundles[0].submitAndScheduleProcess();
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+            0, 0);
+        InstanceUtil.waitForBundleToReachState(cluster1, bundles[0].getProcessName(),
+            Job.Status.SUCCEEDED, 20);
+
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.addProperty("someProp","someVal");
+        AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
+        //check for new feed bundle creation
+        Assert.assertEquals(OozieUtil.getNumberOfBundle(prism, EntityType.FEED,
+            feed.getName()),2);
+        Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
+            bundles[0].getProcessName()),1);
+    }
+
+    /**
+     * schedules a feed and dependent process. Update availability flag and check for process update
+     * Test for bug https://issues.apache.org/jira/browse/FALCON-278
+     */
+    @Test
+    public void updateAvailabilityFlag()
+        throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+        String endTime = TimeUtil.getTimeWrtSystemTime(30);
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].submitAndScheduleAllFeeds();
+        bundles[0].submitAndScheduleProcess();
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+            0, 0);
+
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setAvailabilityFlag("mytestflag");
+        AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
+        //check for new feed bundle creation
+        Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.FEED,
+            feed.getName()),2);
+        Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
+            bundles[0].getProcessName()),2);
+    }
+
+}


Mime
View raw message