falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [12/27] adding falcon-regression
Date Mon, 04 Aug 2014 10:04:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
new file mode 100644
index 0000000..f9f37cb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -0,0 +1,1713 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.process.ExecutionType;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.APIResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.supportClasses.HadoopFileEditor;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Minutes;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * test for process update.
+ */
+@Test(groups = "distributed")
+public class NewPrismProcessUpdateTest extends BaseTestClass {
+
+    private String baseTestDir = baseHDFSDir + "/NewPrismProcessUpdateTest";
+    private String inputFeedPath = baseTestDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String workflowPath = baseTestDir + "/falcon-oozie-wf";
+    private String workflowPath2 = baseTestDir + "/falcon-oozie-wf2";
+    private String aggregatorPath = baseTestDir + "/aggregator";
+    private String aggregator1Path = baseTestDir + "/aggregator1";
+    private ColoHelper cluster1 = servers.get(0);
+    private ColoHelper cluster2 = servers.get(1);
+    private ColoHelper cluster3 = servers.get(2);
+    private FileSystem cluster1FS = serverFS.get(0);
+    private OozieClient cluster2OC = serverOC.get(1);
+    private OozieClient cluster3OC = serverOC.get(2);
+    private static final Logger LOGGER = Logger.getLogger(NewPrismProcessUpdateTest.class);
+
+    @BeforeMethod(alwaysRun = true)
+    public void testSetup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        Bundle b = BundleUtil.readUpdateBundle();
+        bundles[0] = new Bundle(b, cluster1);
+        bundles[1] = new Bundle(b, cluster2);
+        bundles[2] = new Bundle(b, cluster3);
+        setBundleWFPath(bundles[0], bundles[1], bundles[2]);
+        bundles[1].addClusterToBundle(bundles[2].getClusters().get(0),
+                ClusterType.TARGET, null, null);
+        usualGrind(cluster3, bundles[1]);
+        Util.restartService(cluster3.getClusterHelper());
+    }
+
+    @BeforeClass(alwaysRun = true)
+    public void setup() throws Exception {
+        for (String wfPath : new String[] { workflowPath, workflowPath2, aggregatorPath, aggregator1Path }) {
+            uploadDirToClusters(wfPath, OSUtil.RESOURCES_OOZIE);
+        }
+        Util.restartService(cluster3.getClusterHelper());
+        Util.restartService(cluster1.getClusterHelper());
+        Util.restartService(cluster2.getClusterHelper());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessFrequencyInEachColoWithOneProcessRunningMonthly()
+        throws Exception {
+        final String startTIme = TimeUtil.getTimeWrtSystemTime(-20);
+        String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60);
+        bundles[1].setProcessPeriodicity(1, TimeUnit.months);
+        bundles[1].setOutputFeedPeriodicity(1, TimeUnit.months);
+        bundles[1].setProcessValidity(startTIme, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        String updatedProcess = InstanceUtil
+                .setProcessFrequency(bundles[1].getProcessData(),
+                        new Frequency("" + 5, TimeUnit.minutes));
+
+        LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess));
+
+        //now to update
+        while (Util
+                .parseResponse(prism.getProcessHelper()
+                        .update((bundles[1].getProcessData()), updatedProcess))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("update didn't SUCCEED in last attempt");
+            TimeUtil.sleepSeconds(10);
+        }
+
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+                Util.getProcessObject(updatedProcess).getFrequency());
+        TimeUtil.sleepSeconds(60);
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, false);
+        waitingForBundleFinish(cluster3, oldBundleId, 5);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    //failing due to falcon bug : https://issues.apache.org/jira/browse/FALCON-458
+    public void updateProcessRollStartTimeForwardInEachColoWithOneProcessRunning()
+        throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+        String endTime = TimeUtil.getTimeWrtSystemTime(7);
+        bundles[1].setProcessValidity(startTime, endTime);
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        List<String> oldNominalTimes =
+                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+
+        String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getStart()
+        ), 20);
+        String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getStart()
+        ), 25);
+
+        bundles[1].setProcessValidity(newStartTime, newEndTime);
+        bundles[1].setProcessConcurrency(10);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        LOGGER.info("updated process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+        while (Util.parseResponse(
+                prism.getProcessHelper()
+                        .update(bundles[1].getProcessData(), Util.prettyPrintXml(bundles[1]
+                                .getProcessData())))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("update didn't SUCCEED in last attempt");
+            TimeUtil.sleepSeconds(10);
+        }
+
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, false);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+
+        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
+                bundles[1].getProcessName(), 0);
+        waitingForBundleFinish(cluster3, oldBundleId, 15);
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        int finalNumberOfInstances =
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+        Assert.assertEquals(finalNumberOfInstances,
+                getExpectedNumberOfWorkflowInstances(TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                                .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd())));
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        int expectedNumberOfWorkflows =
+                getExpectedNumberOfWorkflowInstances(newStartTime, TimeUtil
+                        .dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getEnd()));
+        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+                expectedNumberOfWorkflows);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1800000)
+    public void updateProcessConcurrencyWorkflowExecutionInEachColoWithOneColoDown()
+        throws Exception {
+        //bundles[1].generateUniqueBundle();
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        TimeUtil.sleepSeconds(25);
+
+        int initialConcurrency = bundles[1].getProcessObject().getParallel();
+
+        bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
+        bundles[1].setProcessWorkflow(workflowPath2);
+        bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
+
+        //stop cluster 3 where process is scheduled
+        Util.shutDownService(cluster3.getProcessHelper());
+
+        //now to update
+        AssertUtil.assertPartial(
+                prism.getProcessHelper()
+                        .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+                initialConcurrency);
+        Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+                workflowPath);
+        Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+                bundles[1].getProcessObject().getOrder());
+
+        String coloString = getResponse(cluster2, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(coloString).getWorkflow().getPath(),
+                workflowPath2);
+
+        Util.startService(cluster3.getProcessHelper());
+        dualComparisonFailure(prism, cluster2, bundles[1].getProcessData());
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        AssertUtil
+                .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        while (Util.parseResponse(
+                prism.getProcessHelper()
+                        .update(bundles[1].getProcessData(), bundles[1].getProcessData()))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("WARNING: update did not succeed, retrying ");
+            TimeUtil.sleepSeconds(20);
+        }
+        prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+                initialConcurrency + 3);
+        Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+                workflowPath2);
+        Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+                bundles[1].getProcessObject().getOrder());
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        AssertUtil
+                .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        waitingForBundleFinish(cluster3, oldBundleId);
+        int finalNumberOfInstances =
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+        int expectedInstances =
+                getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd()));
+        Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+                "number of instances doesnt match :(");
+
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessFrequencyInEachColoWithOneProcessRunning() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(-2);
+        String endTime = TimeUtil.getTimeWrtSystemTime(20);
+        bundles[1].setProcessValidity(startTime, endTime);
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+        LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+        String updatedProcess = InstanceUtil
+                .setProcessFrequency(bundles[1].getProcessData(),
+                        new Frequency("" + 7, TimeUnit.minutes));
+
+        LOGGER.info("updated process: " + updatedProcess);
+
+        //now to update
+
+        ServiceResponse response =
+                prism.getProcessHelper().update(updatedProcess, updatedProcess);
+        AssertUtil.assertSucceeded(response);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, false);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+                Util.getProcessObject(updatedProcess).getFrequency());
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        waitingForBundleFinish(cluster3, oldBundleId);
+
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessNameInEachColoWithOneProcessRunning() throws Exception {
+        //bundles[1].generateUniqueBundle();
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        String originalProcessData = bundles[1].getProcessData();
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        TimeUtil.sleepSeconds(20);
+        List<String> oldNominalTimes =
+                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+        bundles[1].setProcessName("myNewProcessName");
+
+        //now to update
+        ServiceResponse response =
+                prism.getProcessHelper()
+                        .update((bundles[1].getProcessData()), bundles[1].getProcessData());
+        AssertUtil.assertFailed(response);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                originalProcessData, false, false);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessConcurrencyInEachColoWithOneProcessRunning()
+        throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(-2);
+        String endTime = TimeUtil.getTimeWrtSystemTime(10);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        //bundles[1].generateUniqueBundle();
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        //now to update
+        DateTime updateTime = new DateTime(DateTimeZone.UTC);
+        TimeUtil.sleepSeconds(60);
+        List<String> oldNominalTimes =
+                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+        LOGGER.info("updating at " + updateTime);
+        while (Util
+                .parseResponse(updateProcessConcurrency(bundles[1],
+                        bundles[1].getProcessObject().getParallel() + 3))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("WARNING: update did not scceed, retyring ");
+            TimeUtil.sleepSeconds(20);
+        }
+
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+                bundles[1].getProcessObject().getParallel() + 3);
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated
+        // correctly.
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(),
+                false, true);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        // future : should be verified using cord xml
+        Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+                Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        boolean doesExist = false;
+        while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
+                &&
+                status != Job.Status.DONEWITHERROR) {
+            int statusCount = InstanceUtil
+                    .getInstanceCountWithStatus(cluster3,
+                            Util.readEntityName(bundles[1].getProcessData()),
+                            org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+                            EntityType.PROCESS);
+            if (statusCount == bundles[1].getProcessObject().getParallel() + 3) {
+                doesExist = true;
+                break;
+            }
+            status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+                    Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+            Assert.assertNotNull(status,
+                    "status must not be null!");
+            TimeUtil.sleepSeconds(30);
+        }
+
+        Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
+        int expectedNumberOfInstances =
+                getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd()));
+        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+                expectedNumberOfInstances);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessIncreaseValidityInEachColoWithOneProcessRunning() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+        String endTime = TimeUtil.getTimeWrtSystemTime(8);
+        bundles[1].setProcessValidity(startTime, endTime);
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getEnd()
+        ), 4);
+        bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                                .getStart()),
+                newEndTime);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        ServiceResponse response = prism.getProcessHelper()
+                .update(bundles[1].getProcessData(), bundles[1].getProcessData());
+        for (int i = 0; i < 10
+                &&
+                Util.parseResponse(response).getStatus() != APIResult.Status.SUCCEEDED; ++i) {
+            response = prism.getProcessHelper()
+                    .update(bundles[1].getProcessData(), bundles[1].getProcessData());
+            TimeUtil.sleepSeconds(6);
+        }
+        Assert.assertEquals(Util.parseResponse(response).getStatus(),
+                APIResult.Status.SUCCEEDED, "Process update did not succeed.");
+
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), false, true);
+
+        int i = 0;
+
+        while (OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId)
+                != getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                                .getStart()
+                ), TimeUtil.dateToOozieDate(
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                .getValidity()
+                                .getEnd()))
+                && i < 10) {
+            TimeUtil.sleepSeconds(1);
+            i++;
+        }
+
+        bundles[1].verifyDependencyListing(cluster2);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
+                bundles[1].getProcessName(), 0);
+        waitingForBundleFinish(cluster3, oldBundleId);
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated
+        // correctly.
+        int finalNumberOfInstances = InstanceUtil
+                .getProcessInstanceList(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
+                .size();
+        Assert.assertEquals(finalNumberOfInstances,
+                getExpectedNumberOfWorkflowInstances(TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                                .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd())));
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessConcurrencyInEachColoWithOneProcessSuspended()
+        throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+        String endTime = TimeUtil.getTimeWrtSystemTime(7);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        //now to update
+        while (Util
+                .parseResponse(updateProcessConcurrency(bundles[1],
+                        bundles[1].getProcessObject().getParallel() + 3))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("WARNING: update did not scceed, retyring ");
+            TimeUtil.sleepSeconds(20);
+        }
+
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+                bundles[1].getProcessObject().getParallel() + 3);
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), false, true);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+                .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+                Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        boolean doesExist = false;
+        while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
+                &&
+                status != Job.Status.DONEWITHERROR) {
+            if (InstanceUtil
+                    .getInstanceCountWithStatus(cluster3,
+                            Util.readEntityName(bundles[1].getProcessData()),
+                            org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+                            EntityType.PROCESS)
+                    ==
+                    bundles[1].getProcessObject().getParallel()) {
+                doesExist = true;
+                break;
+            }
+            status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+                    Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        }
+
+        Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
+
+        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
+                bundles[1].getProcessName(), 0);
+
+        waitingForBundleFinish(cluster3, oldBundleId);
+
+        int finalNumberOfInstances =
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+        int expectedInstances =
+                getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd()));
+
+        Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+                "number of instances doesnt match :(");
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessConcurrencyInEachColoWithOneColoDown() throws Exception {
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+        String endTime = TimeUtil.getTimeWrtSystemTime(5);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+
+        LOGGER.info("process to be scheduled: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        //now to update
+        Util.shutDownService(cluster3.getClusterHelper());
+
+        ServiceResponse response =
+                updateProcessConcurrency(bundles[1],
+                        bundles[1].getProcessObject().getParallel() + 3);
+        AssertUtil.assertPartial(response);
+
+        Util.startService(cluster3.getClusterHelper());
+
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+                bundles[1].getProcessObject().getParallel());
+
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
+                Job.Status.RUNNING);
+
+        while (Util
+                .parseResponse(updateProcessConcurrency(bundles[1],
+                        bundles[1].getProcessObject().getParallel() + 3))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("WARNING: update did not scceed, retyring ");
+            TimeUtil.sleepSeconds(20);
+        }
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        dualComparison(prism, cluster2, bundles[1].getProcessData());
+
+        Job.Status status =
+                OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        boolean doesExist = false;
+        while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
+                &&
+                status != Job.Status.DONEWITHERROR) {
+            if (InstanceUtil
+                    .getInstanceCountWithStatus(cluster3,
+                            Util.readEntityName(bundles[1].getProcessData()),
+                            org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+                            EntityType.PROCESS)
+                    ==
+                    bundles[1].getProcessObject().getParallel() + 3) {
+                doesExist = true;
+                break;
+            }
+            status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+                    Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+            TimeUtil.sleepSeconds(30);
+        }
+        Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
+        OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
+                        .getLatestBundleID(cluster3,
+                                Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
+                oldNominalTimes, Util.readEntityName(bundles[1].getProcessData()), false,
+                true
+        );
+
+        waitingForBundleFinish(cluster3, oldBundleId);
+
+        int finalNumberOfInstances =
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+        int expectedInstances =
+                getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd()));
+        Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+                "number of instances doesnt match :(");
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessRunning()
+        throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+        String endTime = TimeUtil.getTimeWrtSystemTime(7);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        int initialConcurrency = bundles[1].getProcessObject().getParallel();
+
+        bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
+        bundles[1].setProcessWorkflow(aggregator1Path);
+        bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
+
+        //now to update
+
+        String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString();
+
+        LOGGER.info("updating @ " + updateTime);
+
+        while (Util.parseResponse(
+                prism.getProcessHelper().update((bundles[1].getProcessData()), bundles[1]
+                        .getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) {
+            TimeUtil.sleepSeconds(10);
+        }
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+                initialConcurrency + 3);
+        Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+                aggregator1Path);
+        Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+                bundles[1].getProcessObject().getOrder());
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        waitingForBundleFinish(cluster3, oldBundleId);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        int finalNumberOfInstances =
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+        int expectedInstances =
+                getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd()));
+        Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+                "number of instances doesnt match :(");
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessSuspended()
+        throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(2);
+        String endTime = TimeUtil.getTimeWrtSystemTime(6);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        int initialConcurrency = bundles[1].getProcessObject().getParallel();
+
+        bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
+        bundles[1].setProcessWorkflow(aggregator1Path);
+        bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
+        //suspend
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+        //now to update
+        String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString();
+        LOGGER.info("updating @ " + updateTime);
+        while (Util.parseResponse(
+                prism.getProcessHelper()
+                        .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            TimeUtil.sleepSeconds(10);
+        }
+
+        AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+                .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+
+        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+        Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+                initialConcurrency + 3);
+        Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+                aggregator1Path);
+        Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+                bundles[1].getProcessObject().getOrder());
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        waitingForBundleFinish(cluster3, oldBundleId);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        int finalNumberOfInstances =
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+        int expectedInstances =
+                getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getStart()),
+                        TimeUtil
+                                .dateToOozieDate(
+                                        bundles[1].getProcessObject().getClusters().getClusters()
+                                                .get(0).getValidity()
+                                                .getEnd()));
+        Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+                "number of instances doesnt match :(");
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessAddNewInputInEachColoWithOneProcessRunning() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+        String endTime = TimeUtil.getTimeWrtSystemTime(6);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        TimeUtil.sleepSeconds(20);
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
+        String inputFeed = bundles[1].getInputFeedFromBundle();
+
+        bundles[1].addProcessInput(newFeedName, "inputData2");
+        inputFeed = Util.setFeedName(inputFeed, newFeedName);
+
+        LOGGER.info(inputFeed);
+        AssertUtil.assertSucceeded(
+                prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, inputFeed));
+
+        while (Util.parseResponse(
+                prism.getProcessHelper()
+                        .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            TimeUtil.sleepSeconds(20);
+        }
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, false);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+        bundles[1].verifyDependencyListing(cluster2);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        waitingForBundleFinish(cluster3, oldBundleId);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessAddNewInputInEachColoWithOneProcessSuspended() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(2);
+        String endTime = TimeUtil.getTimeWrtSystemTime(6);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
+        String inputFeed = bundles[1].getInputFeedFromBundle();
+
+        bundles[1].addProcessInput(newFeedName, "inputData2");
+        inputFeed = Util.setFeedName(inputFeed, newFeedName);
+
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(
+                prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, inputFeed));
+
+        while (Util.parseResponse(
+                prism.getProcessHelper()
+                        .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            TimeUtil.sleepSeconds(10);
+        }
+
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, false);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+        AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+                .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+
+        bundles[1].verifyDependencyListing(cluster2);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        waitingForBundleFinish(cluster3, oldBundleId);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessAddNewInputInEachColoWithOneColoDown() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+        String endTime = TimeUtil.getTimeWrtSystemTime(10);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        String originalProcess = bundles[1].getProcessData();
+        String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
+        String inputFeed = bundles[1].getInputFeedFromBundle();
+        bundles[1].addProcessInput(newFeedName, "inputData2");
+        inputFeed = Util.setFeedName(inputFeed, newFeedName);
+        String updatedProcess = bundles[1].getProcessData();
+
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, originalProcess));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(originalProcess), EntityType.PROCESS);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        //submit new feed
+        AssertUtil.assertSucceeded(
+                prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, inputFeed));
+
+        Util.shutDownService(cluster3.getProcessHelper());
+
+        AssertUtil.assertPartial(
+                prism.getProcessHelper()
+                        .update(updatedProcess, updatedProcess));
+
+        Util.startService(cluster3.getProcessHelper());
+        bundles[1].verifyDependencyListing(cluster2);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        Assert.assertFalse(Util.isDefinitionSame(cluster2, prism, originalProcess));
+
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), false, false);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
+                Job.Status.RUNNING);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        while (Util.parseResponse(
+                prism.getProcessHelper().update(updatedProcess, updatedProcess)).getStatus()
+                != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("update didnt SUCCEED in last attempt");
+            TimeUtil.sleepSeconds(10);
+        }
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        Assert.assertTrue(Util.isDefinitionSame(cluster2, prism, originalProcess));
+        bundles[1].verifyDependencyListing(cluster2);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                updatedProcess, true, false);
+        waitingForBundleFinish(cluster3, oldBundleId);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
+                Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessDecreaseValidityInEachColoWithOneProcessRunning() throws Exception {
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getEnd()
+        ), -2);
+        bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                                .getStart()),
+                newEndTime);
+        while (Util.parseResponse(
+                (prism.getProcessHelper()
+                        .update(bundles[1].getProcessData(), bundles[1].getProcessData())))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("update didnt SUCCEED in last attempt");
+            TimeUtil.sleepSeconds(10);
+        }
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), false, true);
+
+        bundles[1].verifyDependencyListing(cluster2);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        waitingForBundleFinish(cluster3, oldBundleId);
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        int finalNumberOfInstances = InstanceUtil
+                .getProcessInstanceList(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
+                .size();
+        Assert.assertEquals(finalNumberOfInstances,
+                getExpectedNumberOfWorkflowInstances(bundles[1]
+                                .getProcessObject().getClusters().getClusters().get(0).getValidity()
+                                .getStart(),
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                .getValidity().getEnd()));
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        int expectedNumberOfWorkflows =
+                getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+                                bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                        .getValidity().getStart()),
+                        newEndTime);
+        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+                expectedNumberOfWorkflows);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessIncreaseValidityInEachColoWithOneProcessSuspended() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+        String endTime = TimeUtil.getTimeWrtSystemTime(3);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        TimeUtil.sleepSeconds(30);
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getEnd()
+        ), 4);
+        bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                                .getStart()),
+            newEndTime);
+
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        while (Util.parseResponse(
+                prism.getProcessHelper()
+                        .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+                .getStatus() != APIResult.Status.SUCCEEDED) {
+            LOGGER.info("update didnt SUCCEED in last attempt");
+            TimeUtil.sleepSeconds(10);
+        }
+        AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+                .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+
+        dualComparison(prism, cluster2, bundles[1].getProcessData());
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        waitingForBundleFinish(cluster3, oldBundleId);
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        int finalNumberOfInstances = InstanceUtil
+                .getProcessInstanceList(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
+                .size();
+        Assert.assertEquals(finalNumberOfInstances,
+                getExpectedNumberOfWorkflowInstances(bundles[1]
+                                .getProcessObject().getClusters().getClusters().get(0).getValidity()
+                                .getStart(),
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                .getValidity().getEnd()));
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    private void setBundleWFPath(Bundle... bundles) {
+        for (Bundle bundle : bundles) {
+            bundle.setProcessWorkflow(workflowPath);
+        }
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessFrequencyInEachColoWithOneProcessRunningDaily() throws Exception {
+        //set daily process
+        final String startTime = TimeUtil.getTimeWrtSystemTime(-20);
+        String endTime = TimeUtil.getTimeWrtSystemTime(4000);
+        bundles[1].setProcessPeriodicity(1, TimeUnit.days);
+        bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes =
+                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+
+        LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+        String updatedProcess = InstanceUtil
+                .setProcessFrequency(bundles[1].getProcessData(),
+                        new Frequency("" + 5, TimeUnit.minutes));
+
+        LOGGER.info("updated process: " + updatedProcess);
+
+        //now to update
+        ServiceResponse response =
+                prism.getProcessHelper().update(updatedProcess, updatedProcess);
+        AssertUtil.assertSucceeded(response);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+        String prismString = dualComparison(prism, cluster2, bundles[1].getProcessData());
+        Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+                new Frequency("" + 5, TimeUnit.minutes));
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated
+        // correctly.
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void
+    updateProcessFrequencyInEachColoWithOneProcessRunningDailyToMonthlyWithStartChange()
+        throws Exception {
+        //set daily process
+        final String startTime = TimeUtil.getTimeWrtSystemTime(-20);
+        String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60);
+        bundles[1].setProcessPeriodicity(1, TimeUnit.days);
+        bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days);
+        bundles[1].setProcessValidity(startTime, endTime);
+
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+        String updatedProcess = InstanceUtil
+                .setProcessFrequency(bundles[1].getProcessData(),
+                        new Frequency("" + 1, TimeUnit.months));
+        updatedProcess = InstanceUtil
+                .setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10),
+                        endTime);
+
+        LOGGER.info("updated process: " + updatedProcess);
+
+        //now to update
+        ServiceResponse response =
+                prism.getProcessHelper().update(updatedProcess, updatedProcess);
+        AssertUtil.assertSucceeded(response);
+        String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData());
+        Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+                new Frequency("" + 1, TimeUnit.months));
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessRollStartTimeBackwardsToPastInEachColoWithOneProcessRunning()
+        throws Exception {
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        TimeUtil.sleepSeconds(30);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+                EntityType.PROCESS);
+
+        String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getStart()
+        ), -3);
+        bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getEnd()
+        ));
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                        .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, true);
+        bundles[1].verifyDependencyListing(cluster2);
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessRollStartTimeForwardInEachColoWithOneProcessSuspended()
+        throws Exception {
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData())
+        );
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        TimeUtil.sleepSeconds(30);
+
+        OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
+        String oldStartTime = TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getStart()
+        );
+        String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getStart()
+        ), 3);
+        bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getEnd()
+        ));
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData())
+        );
+
+        AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                        .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+
+        dualComparison(prism, cluster2, bundles[1].getProcessData());
+
+        bundles[1].verifyDependencyListing(cluster2);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        //ensure that the running process has new coordinators created; while the submitted
+        // one is updated correctly.
+        int finalNumberOfInstances =
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                        Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+        Assert.assertEquals(finalNumberOfInstances,
+                getExpectedNumberOfWorkflowInstances(oldStartTime,
+                        bundles[1].getProcessObject().getClusters().getClusters().get(0)
+                                .getValidity().getEnd()));
+        Assert.assertEquals(InstanceUtil
+                .getProcessInstanceList(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS)
+                .size(), getExpectedNumberOfWorkflowInstances(newStartTime,
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
+
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = { "multiCluster" }, timeOut = 1200000)
+    public void updateProcessRollStartTimeBackwardsInEachColoWithOneProcessSuspended()
+        throws Exception {
+        bundles[1].submitBundle(prism);
+        //now to schedule in 1 colo and let it remain in another
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        String oldBundleId = InstanceUtil
+                .getLatestBundleID(cluster3,
+                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+        TimeUtil.sleepSeconds(30);
+
+        String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getStart()
+        ), -3);
+        bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
+                bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+                        .getEnd()
+        ));
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+                cluster3.getProcessHelper()
+                        .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                        .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+                .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+        List<String> oldNominalTimes =
+                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+                bundles[1].getProcessData(), true, false);
+
+        bundles[1].verifyDependencyListing(cluster2);
+
+        dualComparison(prism, cluster3, bundles[1].getProcessData());
+        waitingForBundleFinish(cluster3, oldBundleId);
+
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(timeOut = 1200000)
+    public void
+    updateProcessWorkflowXml() throws URISyntaxException, JAXBException,
+            IOException, OozieClientException, AuthenticationException {
+        Bundle b = BundleUtil.readELBundle();
+        HadoopFileEditor hadoopFileEditor = null;
+        try {
+
+            b = new Bundle(b, cluster1);
+            b.setProcessWorkflow(workflowPath);
+            b.generateUniqueBundle();
+
+            b.setProcessValidity(TimeUtil.getTimeWrtSystemTime(-10),
+                    TimeUtil.getTimeWrtSystemTime(15));
+            b.submitFeedsScheduleProcess(prism);
+
+            InstanceUtil.waitTillInstancesAreCreated(cluster1, b.getProcessData(), 0, 10);
+            OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
+                    b.getProcessName(), 0);
+            InstanceUtil.waitTillInstanceReachState(serverOC.get(0),
+                    Util.readEntityName(b.getProcessData()), 0, CoordinatorAction.Status.RUNNING,
+                    EntityType.PROCESS);
+
+            //save old data
+            String oldBundleID = InstanceUtil
+                    .getLatestBundleID(cluster1,
+                            Util.readEntityName(b.getProcessData()), EntityType.PROCESS);
+
+            List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1,
+                    oldBundleID,
+                    EntityType.PROCESS);
+
+            //update workflow.xml
+            hadoopFileEditor = new HadoopFileEditor(cluster1FS);
+            hadoopFileEditor.edit(new ProcessMerlin(b
+                            .getProcessData()).getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
+                    "<!-- some comment -->");
+
+            //update
+            prism.getProcessHelper().update(b.getProcessData(),
+                    b.getProcessData());
+
+            TimeUtil.sleepSeconds(20);
+            //verify new bundle creation
+            OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
+                    b.getProcessData(), true, true);
+
+        } finally {
+            b.deleteBundle(prism);
+            if (hadoopFileEditor != null) {
+                hadoopFileEditor.restore();
+            }
+        }
+
+    }
+
+    public ServiceResponse updateProcessConcurrency(Bundle bundle, int concurrency)
+        throws Exception {
+        String oldData = bundle.getProcessData();
+        ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject());
+        updatedProcess.setParallel(concurrency);
+
+        return prism.getProcessHelper()
+                .update(oldData, updatedProcess.toString());
+    }
+
+    /**
+     * this method compares process xml definition from 2 falcon servers / prism and expects them to
+     * be identical. If the definitions are identical then the definition from @param coloHelper1
+     * is @return are response.
+     */
+    private String dualComparison(ColoHelper coloHelper1, ColoHelper coloHelper2,
+            String processData) throws Exception {
+        String colo1Response = getResponse(coloHelper1, processData, true);
+        String colo2Response = getResponse(coloHelper2, processData, true);
+        Assert.assertTrue(XmlUtil.isIdentical(colo1Response, colo2Response),
+                "Process definition should have been identical");
+        return getResponse(coloHelper1, processData, true);
+    }
+
+    /**
+     * this method compares process xml definition from 2 falcon servers / prism and expects them to
+     * be different.
+     */
+    private void dualComparisonFailure(ColoHelper coloHelper1, ColoHelper coloHelper2,
+            String processData) throws Exception {
+        Assert.assertFalse(XmlUtil.isIdentical(getResponse(coloHelper1, processData, true),
+                getResponse(coloHelper2, processData, true)), "Process definition should not have been "
+                + "identical");
+    }
+
+    private String getResponse(ColoHelper prism, String processData, boolean bool)
+        throws Exception {
+        ServiceResponse response = prism.getProcessHelper()
+                .getEntityDefinition(Util.URLS.GET_ENTITY_DEFINITION, processData);
+        if (bool) {
+            AssertUtil.assertSucceeded(response);
+        } else {
+            AssertUtil.assertFailed(response);
+        }
+        String result = response.getMessage();
+        Assert.assertNotNull(result);
+
+        return result;
+
+    }
+
+    private void waitForProcessToReachACertainState(ColoHelper coloHelper, Bundle bundle,
+            Job.Status state)
+        throws Exception {
+
+        while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
+                Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) {
+            //keep waiting
+            TimeUtil.sleepSeconds(10);
+        }
+
+        //now check if the coordinator is in desired state
+        CoordinatorJob coord = getDefaultOozieCoord(coloHelper, InstanceUtil
+                .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
+                        EntityType.PROCESS));
+
+        while (coord.getStatus() != state) {
+            TimeUtil.sleepSeconds(10);
+            coord = getDefaultOozieCoord(coloHelper, InstanceUtil
+                    .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
+                            EntityType.PROCESS));
+        }
+    }
+
+    private Bundle usualGrind(ColoHelper prism, Bundle b) throws Exception {
+        b.setInputFeedDataPath(inputFeedPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
+        HadoopUtil.lateDataReplenish(cluster1FS, 60, 1, prefix, null);
+        final String starTime = TimeUtil.getTimeWrtSystemTime(3);
+        String endTime = TimeUtil.getTimeWrtSystemTime(7);
+        b.setProcessPeriodicity(1, TimeUnit.minutes);
+        b.setOutputFeedPeriodicity(1, TimeUnit.minutes);
+        b.setProcessValidity(starTime, endTime);
+        return b;
+    }
+
+    private ExecutionType getRandomExecutionType(Bundle bundle) throws Exception {
+        ExecutionType current = bundle.getProcessObject().getOrder();
+        Random r = new Random();
+        ExecutionType[] values = ExecutionType.values();
+        int i;
+        do {
+
+            i = r.nextInt(values.length);
+        } while (current == values[i]);
+        return values[i];
+    }
+
+    public ServiceResponse updateProcessFrequency(Bundle bundle,
+            org.apache.falcon.entity.v0.Frequency frequency)
+        throws Exception {
+        String oldData = bundle.getProcessData();
+        ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject());
+        updatedProcess.setFrequency(frequency);
+        return prism.getProcessHelper()
+                .update(oldData, updatedProcess.toString());
+    }
+
+    //need to expand this function more later
+    private int getExpectedNumberOfWorkflowInstances(String start, String end) {
+        DateTime startDate = new DateTime(start);
+        DateTime endDate = new DateTime(end);
+        Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
+        return minutes.getMinutes();
+    }
+
+    private int getExpectedNumberOfWorkflowInstances(Date start, Date end) {
+        DateTime startDate = new DateTime(start);
+        DateTime endDate = new DateTime(end);
+        Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
+        return minutes.getMinutes();
+    }
+
+    private int getExpectedNumberOfWorkflowInstances(String start, Date end) {
+        DateTime startDate = new DateTime(start);
+        DateTime endDate = new DateTime(end);
+        Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
+        return minutes.getMinutes();
+    }
+
+    private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId, int minutes)
+        throws Exception {
+        int wait = 0;
+        while (!OozieUtil.isBundleOver(coloHelper, bundleId)) {
+            //keep waiting
+            LOGGER.info("bundle not over .. waiting");
+            TimeUtil.sleepSeconds(60);
+            wait++;
+            if (wait == minutes) {
+                Assert.assertTrue(false);
+                break;
+            }
+        }
+    }
+
+    private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId) throws Exception {
+        waitingForBundleFinish(coloHelper, bundleId, 15);
+    }
+
+    private CoordinatorJob getDefaultOozieCoord(ColoHelper coloHelper, String bundleId)
+        throws Exception {
+        OozieClient client = coloHelper.getFeedHelper().getOozieClient();
+        BundleJob bundlejob = client.getBundleJobInfo(bundleId);
+
+        for (CoordinatorJob coord : bundlejob.getCoordinators()) {
+            if (coord.getAppName().contains("DEFAULT")) {
+                return client.getCoordJobInfo(coord.getId());
+            }
+        }
+        return null;
+    }
+
+}


Mime
View raw message