falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rostafiyc...@apache.org
Subject [2/5] falcon git commit: FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
Date Tue, 14 Apr 2015 14:01:32 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index 3f6dc66..a5a89d7 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -129,13 +129,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to schedule in 1 colo and let it remain in another
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3, bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
@@ -159,11 +160,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, false);
         waitingForBundleFinish(cluster3, oldBundleId, 5);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
 
     }
@@ -182,12 +183,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
         List<String> oldNominalTimes =
-                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+                OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
 
         String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
                 bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
@@ -201,7 +202,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         bundles[1].setProcessValidity(newStartTime, newEndTime);
         bundles[1].setProcessConcurrency(10);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
         LOGGER.info("updated process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
         while (Util.parseResponse(
@@ -213,7 +214,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
             TimeUtil.sleepSeconds(10);
         }
 
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, false);
 
         dualComparison(prism, cluster3, bundles[1].getProcessData());
@@ -222,7 +223,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         int finalNumberOfInstances =
-                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS).size();
         Assert.assertEquals(finalNumberOfInstances,
                 getExpectedNumberOfWorkflowInstances(TimeUtil
@@ -240,7 +241,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                         .dateToOozieDate(
                                 bundles[1].getProcessObject().getClusters().getClusters().get(0)
                                         .getValidity().getEnd()));
-        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId),
                 expectedNumberOfWorkflows);
     }
 
@@ -252,9 +253,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to schedule in 1 colo and let it remain in another
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
         TimeUtil.sleepSeconds(25);
 
         int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -285,7 +286,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         AssertUtil
                 .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
         while (Util.parseResponse(
                 prism.getProcessHelper()
                         .update(bundles[1].getProcessData(), bundles[1].getProcessData()))
@@ -302,7 +303,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
         waitingForBundleFinish(cluster3, oldBundleId);
         int finalNumberOfInstances =
-                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS).size();
 
         int expectedInstances =
@@ -328,13 +329,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to schedule in 1 colo and let it remain in another
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
         LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
 
@@ -347,7 +348,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         ServiceResponse response =
             prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
         AssertUtil.assertSucceeded(response);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, false);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
 
@@ -366,15 +367,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String originalProcessData = bundles[1].getProcessData();
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
         TimeUtil.sleepSeconds(20);
         List<String> oldNominalTimes =
-                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+                OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
         bundles[1].setProcessName(this.getClass().getSimpleName() + "-myNewProcessName");
 
         //now to update
@@ -382,7 +383,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 prism.getProcessHelper()
                         .update((bundles[1].getProcessData()), bundles[1].getProcessData());
         AssertUtil.assertFailed(response);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 originalProcessData, false, false);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
@@ -400,18 +401,18 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
         //now to update
         DateTime updateTime = new DateTime(DateTimeZone.UTC);
         TimeUtil.sleepSeconds(60);
         List<String> oldNominalTimes =
-                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+                OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
         LOGGER.info("updating at " + updateTime);
         while (Util
                 .parseResponse(updateProcessConcurrency(bundles[1],
@@ -428,7 +429,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated
         // correctly.
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(),
                 false, true);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -443,7 +444,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 &&
                 status != Job.Status.DONEWITHERROR) {
             int statusCount = InstanceUtil
-                    .getInstanceCountWithStatus(cluster3,
+                    .getInstanceCountWithStatus(cluster3OC,
                             bundles[1].getProcessName(),
                             org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
                             EntityType.PROCESS);
@@ -468,7 +469,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                                         bundles[1].getProcessObject().getClusters().getClusters()
                                                 .get(0).getValidity()
                                                 .getEnd()));
-        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId),
                 expectedNumberOfInstances);
     }
 
@@ -484,11 +485,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -500,7 +501,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                                 .getStart()),
                 newEndTime);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
         ServiceResponse response = prism.getProcessHelper()
                 .update(bundles[1].getProcessData(), bundles[1].getProcessData());
@@ -514,12 +515,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         Assert.assertEquals(Util.parseResponse(response).getStatus(),
                 APIResult.Status.SUCCEEDED, "Process update did not succeed.");
 
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), false, true);
 
         int i = 0;
 
-        while (OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId)
+        while (OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId)
                 != getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
                         bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
                                 .getStart()
@@ -540,7 +541,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         // one is updated
         // correctly.
         int finalNumberOfInstances = InstanceUtil
-                .getProcessInstanceList(cluster3,
+                .getProcessInstanceList(cluster3OC,
                     bundles[1].getProcessName(), EntityType.PROCESS)
                 .size();
         Assert.assertEquals(finalNumberOfInstances,
@@ -567,15 +568,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to schedule in 1 colo and let it remain in another
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         AssertUtil.assertSucceeded(
@@ -595,7 +596,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), false, true);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
         AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
@@ -610,7 +611,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 &&
                 status != Job.Status.DONEWITHERROR) {
             if (InstanceUtil
-                    .getInstanceCountWithStatus(cluster3,
+                    .getInstanceCountWithStatus(cluster3OC,
                             bundles[1].getProcessName(),
                             org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
                             EntityType.PROCESS)
@@ -627,7 +628,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         waitingForBundleFinish(cluster3, oldBundleId);
 
         int finalNumberOfInstances =
-                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS).size();
 
         int expectedInstances =
@@ -660,11 +661,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS);
 
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         //now to update
@@ -707,7 +708,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 &&
                 status != Job.Status.DONEWITHERROR) {
             if (InstanceUtil
-                    .getInstanceCountWithStatus(cluster3,
+                    .getInstanceCountWithStatus(cluster3OC,
                             bundles[1].getProcessName(),
                             org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
                             EntityType.PROCESS)
@@ -721,9 +722,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
             TimeUtil.sleepSeconds(30);
         }
         Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
-        OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
-                        .getLatestBundleID(cluster3,
-                                bundles[1].getProcessName(), EntityType.PROCESS),
+        OozieUtil.verifyNewBundleCreation(cluster3OC, OozieUtil
+                        .getLatestBundleID(cluster3OC,
+                            bundles[1].getProcessName(), EntityType.PROCESS),
                 oldNominalTimes, bundles[1].getProcessData(), false,
                 true
         );
@@ -731,7 +732,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         waitingForBundleFinish(cluster3, oldBundleId);
 
         int finalNumberOfInstances =
-                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS).size();
 
         int expectedInstances =
@@ -761,11 +762,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -793,11 +794,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         waitingForBundleFinish(cluster3, oldBundleId);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
         int finalNumberOfInstances =
-                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS).size();
         int expectedInstances =
                 getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
@@ -826,13 +827,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -864,11 +865,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         waitingForBundleFinish(cluster3, oldBundleId);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
         int finalNumberOfInstances =
-                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS).size();
 
         int expectedInstances =
@@ -897,13 +898,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
         TimeUtil.sleepSeconds(20);
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
@@ -921,7 +922,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 .getStatus() != APIResult.Status.SUCCEEDED) {
             TimeUtil.sleepSeconds(20);
         }
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, false);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
 
@@ -931,7 +932,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         waitingForBundleFinish(cluster3, oldBundleId);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
@@ -947,14 +948,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         TimeUtil.sleepSeconds(30);
-        InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
@@ -974,7 +975,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
             TimeUtil.sleepSeconds(10);
         }
 
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, false);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
 
@@ -986,7 +987,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         waitingForBundleFinish(cluster3, oldBundleId);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
 
@@ -1012,12 +1013,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        Util.readEntityName(originalProcess), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    Util.readEntityName(originalProcess), EntityType.PROCESS);
 
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         //submit new feed
@@ -1037,12 +1038,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), false, false);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
                 Job.Status.RUNNING);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
         while (Util.parseResponse(
                 prism.getProcessHelper().update(updatedProcess, updatedProcess)).getStatus()
@@ -1053,13 +1054,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         Assert.assertTrue(Util.isDefinitionSame(cluster2, prism, originalProcess));
         bundles[1].verifyDependencyListing(cluster2);
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 updatedProcess, true, false);
         waitingForBundleFinish(cluster3, oldBundleId);
 
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
 
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
                 Job.Status.RUNNING);
@@ -1074,13 +1075,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1098,7 +1099,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
             LOGGER.info("update didnt SUCCEED in last attempt");
             TimeUtil.sleepSeconds(10);
         }
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), false, true);
 
         bundles[1].verifyDependencyListing(cluster2);
@@ -1108,7 +1109,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         int finalNumberOfInstances = InstanceUtil
-                .getProcessInstanceList(cluster3,
+                .getProcessInstanceList(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS)
                 .size();
         Assert.assertEquals(finalNumberOfInstances,
@@ -1123,7 +1124,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                                 bundles[1].getProcessObject().getClusters().getClusters().get(0)
                                         .getValidity().getStart()),
                         newEndTime);
-        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+        Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId),
                 expectedNumberOfWorkflows);
     }
 
@@ -1137,12 +1138,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to schedule in 1 colo and let it remain in another
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
         TimeUtil.sleepSeconds(30);
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
         String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
                 bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
@@ -1171,7 +1172,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         int finalNumberOfInstances = InstanceUtil
-                .getProcessInstanceList(cluster3,
+                .getProcessInstanceList(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS)
                 .size();
         Assert.assertEquals(finalNumberOfInstances,
@@ -1204,13 +1205,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
         List<String> oldNominalTimes =
-                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+                OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
 
         LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
 
@@ -1232,7 +1233,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated
         // correctly.
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
@@ -1255,12 +1256,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
@@ -1281,7 +1282,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
@@ -1296,11 +1297,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         TimeUtil.sleepSeconds(30);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS);
 
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
                 EntityType.PROCESS);
 
         String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1312,13 +1313,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                         .getEnd()
         ));
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
         OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
         AssertUtil.assertSucceeded(
                 prism.getProcessHelper()
                         .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
 
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, true);
         bundles[1].verifyDependencyListing(cluster2);
         dualComparison(prism, cluster3, bundles[1].getProcessData());
@@ -1331,12 +1332,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to schedule in 1 colo and let it remain in another
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
         TimeUtil.sleepSeconds(30);
 
-        OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
+        OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId);
         String oldStartTime = TimeUtil.dateToOozieDate(
                 bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
                         .getStart()
@@ -1350,7 +1351,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                         .getEnd()
         ));
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
@@ -1367,14 +1368,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
         int finalNumberOfInstances =
-                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, bundles[1].getProcessName(),
+                InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(),
                    EntityType.PROCESS).size();
         Assert.assertEquals(finalNumberOfInstances,
                 getExpectedNumberOfWorkflowInstances(oldStartTime,
                         bundles[1].getProcessObject().getClusters().getClusters().get(0)
                                 .getValidity().getEnd()));
         Assert.assertEquals(InstanceUtil
-                .getProcessInstanceList(cluster3,
+                .getProcessInstanceList(cluster3OC,
                         bundles[1].getProcessName(), EntityType.PROCESS)
                 .size(), getExpectedNumberOfWorkflowInstances(newStartTime,
                 bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
@@ -1389,9 +1390,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to schedule in 1 colo and let it remain in another
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
-        String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        bundles[1].getProcessName(), EntityType.PROCESS);
+        String oldBundleId = OozieUtil
+                .getLatestBundleID(cluster3OC,
+                    bundles[1].getProcessName(), EntityType.PROCESS);
         TimeUtil.sleepSeconds(30);
 
         String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1404,7 +1405,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         ));
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
 
-        waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+        waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
 
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
@@ -1413,9 +1414,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                         .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
         AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
         List<String> oldNominalTimes =
-                OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+                OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
 
-        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, false);
 
         bundles[1].verifyDependencyListing(cluster2);
@@ -1448,11 +1449,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                     EntityType.PROCESS);
 
             //save old data
-            String oldBundleID = InstanceUtil
-                    .getLatestBundleID(cluster1,
-                            Util.readEntityName(b.getProcessData()), EntityType.PROCESS);
+            String oldBundleID = OozieUtil
+                    .getLatestBundleID(cluster1OC,
+                        Util.readEntityName(b.getProcessData()), EntityType.PROCESS);
 
-            List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1,
+            List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1OC,
                     oldBundleID,
                     EntityType.PROCESS);
 
@@ -1467,7 +1468,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
             TimeUtil.sleepSeconds(20);
             //verify new bundle creation
-            OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
+            OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, oldNominalTimes,
                     b.getProcessData(), true, true);
 
         } finally {
@@ -1529,26 +1530,26 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
     }
 
-    private void waitForProcessToReachACertainState(ColoHelper coloHelper, Bundle bundle,
+    private void waitForProcessToReachACertainState(OozieClient oozieClient, Bundle bundle,
             Job.Status state)
         throws Exception {
 
-        while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
+        while (OozieUtil.getOozieJobStatus(oozieClient,
                 bundle.getProcessName(), EntityType.PROCESS) != state) {
             //keep waiting
             TimeUtil.sleepSeconds(10);
         }
 
         //now check if the coordinator is in desired state
-        CoordinatorJob coord = getDefaultOozieCoord(coloHelper, InstanceUtil
-                .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
-                        EntityType.PROCESS));
+        CoordinatorJob coord = getDefaultOozieCoord(oozieClient, OozieUtil
+                .getLatestBundleID(oozieClient, bundle.getProcessName(),
+                    EntityType.PROCESS));
 
         while (coord.getStatus() != state) {
             TimeUtil.sleepSeconds(10);
-            coord = getDefaultOozieCoord(coloHelper, InstanceUtil
-                    .getLatestBundleID(coloHelper, bundle.getProcessName(),
-                            EntityType.PROCESS));
+            coord = getDefaultOozieCoord(oozieClient, OozieUtil
+                    .getLatestBundleID(oozieClient, bundle.getProcessName(),
+                        EntityType.PROCESS));
         }
     }
 
@@ -1630,14 +1631,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         waitingForBundleFinish(coloHelper, bundleId, 15);
     }
 
-    private CoordinatorJob getDefaultOozieCoord(ColoHelper coloHelper, String bundleId)
-        throws Exception {
-        OozieClient client = coloHelper.getFeedHelper().getOozieClient();
-        BundleJob bundlejob = client.getBundleJobInfo(bundleId);
-
+    private CoordinatorJob getDefaultOozieCoord(OozieClient oozieClient, String bundleId) throws Exception {
+        BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId);
         for (CoordinatorJob coord : bundlejob.getCoordinators()) {
             if (coord.getAppName().contains("DEFAULT")) {
-                return client.getCoordJobInfo(coord.getId());
+                return oozieClient.getCoordJobInfo(coord.getId());
             }
         }
         return null;

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
index 39f0268..944c67f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
@@ -284,7 +284,7 @@ public class OptionalInputTest extends BaseTestClass {
         prism.getProcessHelper().update(process.toString(), process.toString());
 
         //from now on ... it should wait of input0 also
-        InstanceUtil.waitTillInstancesAreCreated(cluster, process.toString(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(serverOC.get(0), process.toString(), 0);
         InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS, 10);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
index 4221525..1bc4027 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
@@ -28,11 +28,13 @@ import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowJob;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -51,6 +53,7 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
     private ColoHelper cluster1 = servers.get(0);
     private ColoHelper cluster2 = servers.get(1);
     private ColoHelper cluster3 = servers.get(2);
+    private OozieClient cluster1OC = serverOC.get(0);
     private FileSystem cluster1FS = serverFS.get(0);
     private FileSystem cluster2FS = serverFS.get(1);
     private FileSystem cluster3FS = serverFS.get(2);
@@ -124,23 +127,22 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
                 .withPartition("UK/${cluster.colo}")
                 .build()).toString();
 
-
         LOGGER.info("feed: " + Util.prettyPrintXml(feed));
 
         prism.getFeedHelper().submitAndSchedule(feed);
         TimeUtil.sleepSeconds(10);
 
         String bundleId =
-            InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+            OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
 
         //wait till 1st instance of replication coord is SUCCEEDED
-        List<String> replicationCoordIDTarget = InstanceUtil
+        List<String> replicationCoordIDTarget = OozieUtil
             .getReplicationCoordID(bundleId, cluster1.getFeedHelper());
 
         for (int i = 0; i < 30; i++) {
-            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
                 0) == WorkflowJob.Status.SUCCEEDED
-                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
                     replicationCoordIDTarget.get(1), 0)
                 == WorkflowJob.Status.SUCCEEDED) {
                 break;
@@ -151,10 +153,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
         TimeUtil.sleepSeconds(15);
 
         List<String> inputFolderListForColo1 =
-            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
                 replicationCoordIDTarget.get(0), 1);
         List<String> inputFolderListForColo2 =
-            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
                 replicationCoordIDTarget.get(1), 1);
 
         HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
@@ -216,16 +218,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
         TimeUtil.sleepSeconds(60);
 
         //wait till 1st instance of replication coord is SUCCEEDED
-        String bundleId = InstanceUtil
-            .getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
-
-        List<String> replicationCoordIDTarget = InstanceUtil.getReplicationCoordID(bundleId,
+        String bundleId = OozieUtil
+            .getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
+        List<String> replicationCoordIDTarget = OozieUtil.getReplicationCoordID(bundleId,
             cluster1.getFeedHelper());
 
         for (int i = 0; i < 30; i++) {
-            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
                 0) == WorkflowJob.Status.SUCCEEDED
-                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
                     replicationCoordIDTarget.get(1), 0)
                 == WorkflowJob.Status.SUCCEEDED) {
                 break;
@@ -234,19 +235,19 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
             TimeUtil.sleepSeconds(20);
         }
 
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(0), 0),
             WorkflowJob.Status.SUCCEEDED);
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(1), 0),
             WorkflowJob.Status.SUCCEEDED);
 
         TimeUtil.sleepSeconds(15);
 
         List<String> inputFolderListForColo1 = InstanceUtil
-            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1);
+            .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1);
         List<String> inputFolderListForColo2 = InstanceUtil
-            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1);
+            .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1);
 
         HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
             inputFolderListForColo1);
@@ -258,17 +259,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
 
         //check for run id to  be 1
         Assert.assertEquals(
-            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0),
             1, "id has to be equal 1");
         Assert.assertEquals(
-            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0),
             1, "id has to be equal 1");
 
         //wait for lates run to complete
         for (int i = 0; i < 30; i++) {
-            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
                 0) == WorkflowJob.Status.SUCCEEDED
-                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
                     replicationCoordIDTarget.get(1), 0)
                 == WorkflowJob.Status.SUCCEEDED) {
                 break;
@@ -276,10 +277,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
             LOGGER.info("still in for loop");
             TimeUtil.sleepSeconds(20);
         }
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(0), 0),
             WorkflowJob.Status.SUCCEEDED);
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(1), 0),
             WorkflowJob.Status.SUCCEEDED);
 
@@ -296,10 +297,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
 
         //check for run id to be 2
         Assert.assertEquals(
-            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0),
             2, "id has to be equal 2");
         Assert.assertEquals(
-            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0),
             2, "id has to be equal 2");
     }
 
@@ -381,15 +382,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
 
         //wait till 1st instance of replication coord is SUCCEEDED
         String bundleId =
-            InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+            OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
 
         List<String> replicationCoordIDTarget =
-            InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+            OozieUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
 
         for (int i = 0; i < 30; i++) {
-            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
                 0) == WorkflowJob.Status.SUCCEEDED
-                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
                     replicationCoordIDTarget.get(1), 0)
                 == WorkflowJob.Status.SUCCEEDED) {
                 break;
@@ -398,10 +399,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
             TimeUtil.sleepSeconds(20);
         }
 
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
             "Replication job should have succeeded.");
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
             "Replication job should have succeeded.");
 
@@ -411,14 +412,14 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
         // be present. both of them should have _success
 
         List<String> inputFolderListForColo1 = InstanceUtil
-            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1);
+            .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1);
         List<String> inputFolderListForColo2 = InstanceUtil
-            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1);
+            .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1);
 
         String outPutLocation = InstanceUtil
-            .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 0);
+            .getOutputFolderForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 0);
         String outPutBaseLocation = InstanceUtil
-            .getOutputFolderBaseForInstanceForReplication(cluster1,
+            .getOutputFolderBaseForInstanceForReplication(cluster1OC,
                 replicationCoordIDTarget.get(0), 0);
 
         List<String> subFolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
@@ -439,17 +440,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
 
         //check for run id to  be 1
         Assert.assertTrue(
-            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0)
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0)
                 == 1
-                && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+                && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1),
                 0) == 1,
             "id have to be equal 1");
 
         //wait for latest run to complete
         for (int i = 0; i < 30; i++) {
-            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
                 0) == WorkflowJob.Status.SUCCEEDED
-                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
                     replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) {
                 break;
             }
@@ -467,9 +468,9 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
         TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9));
         //check for run id to be 2
         Assert.assertTrue(
-            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0)
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0)
                 == 2
-                && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+                && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1),
                 0) == 2,
             "id have to be equal 2");
     }
@@ -571,15 +572,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
 
         //wait till 1st instance of replication coord is SUCCEEDED
         String bundleId =
-            InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+            OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
 
         List<String> replicationCoordIDTarget =
-            InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+            OozieUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
 
         for (int i = 0; i < 30; i++) {
-            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
                 0) == WorkflowJob.Status.SUCCEEDED
-                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
                     replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) {
                 break;
             }
@@ -588,10 +589,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
             TimeUtil.sleepSeconds(20);
         }
 
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
             "Replication job did not succeed");
-        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
             replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
             "Replication job did not succeed");
 
@@ -601,17 +602,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
            be present. both of
            them should have _success */
         List<String> inputFolderListForColo1 =
-            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
                 replicationCoordIDTarget.get(0), 1);
         List<String> inputFolderListForColo2 =
-            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
                 replicationCoordIDTarget.get(1), 1);
 
         String outPutLocation = InstanceUtil
-            .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+            .getOutputFolderForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0),
                 0);
         String outPutBaseLocation = InstanceUtil
-            .getOutputFolderBaseForInstanceForReplication(cluster1,
+            .getOutputFolderBaseForInstanceForReplication(cluster1OC,
                 replicationCoordIDTarget.get(0), 0);
 
         List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
@@ -634,9 +635,9 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
 
         //check for run id to  be 1
         Assert.assertTrue(
-            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0)
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0)
                 == 1
-                && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+                && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1),
                 0) == 1,
             "id have to be equal 1");
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
index c6f72cc..43aafdf 100755
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
@@ -23,13 +23,13 @@ import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.ActionType;
 import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.response.ServiceResponse;
 import org.apache.falcon.regression.core.util.AssertUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -54,14 +54,12 @@ import java.util.List;
 @Test(groups = "distributed")
 public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 
-    private ColoHelper cluster1 = servers.get(0);
-    private ColoHelper cluster2 = servers.get(1);
-    private ColoHelper cluster3 = servers.get(2);
     private FileSystem cluster1FS = serverFS.get(0);
     private FileSystem cluster2FS = serverFS.get(1);
     private FileSystem cluster3FS = serverFS.get(2);
     private OozieClient cluster1OC = serverOC.get(0);
     private OozieClient cluster2OC = serverOC.get(1);
+    private OozieClient cluster3OC = serverOC.get(2);
     private String testDate = "/2012/10/01/12/";
     private String baseTestDir = cleanAndGetTestDir();
     private String testBaseDir1 = baseTestDir + "/localDC/rc/billing";
@@ -83,7 +81,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 // pt : partition in target
 // ps: partition in source
 
-
     private void uploadDataToServer3(String location, String fileName) throws IOException {
         HadoopUtil.recreateDir(cluster3FS, location);
         HadoopUtil.copyDataToFolder(cluster3FS, location, fileName);
@@ -96,9 +93,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
-
         LOGGER.info("creating test data");
-
         uploadDataToServer3(testDirWithDate + "00/ua2/", testFile1);
         uploadDataToServer3(testDirWithDate + "05/ua2/", testFile2);
         uploadDataToServer3(testDirWithDate + "10/ua2/", testFile3);
@@ -123,23 +118,19 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         uploadDataToServer3(testBaseDir3 + testDate + "15/ua2/", testFile4);
         uploadDataToServer3(testBaseDir3 + testDate + "20/ua2/", testFile4);
 
-
         uploadDataToServer3(testBaseDir3 + testDate + "00/ua1/", testFile1);
         uploadDataToServer3(testBaseDir3 + testDate + "05/ua1/", testFile2);
         uploadDataToServer3(testBaseDir3 + testDate + "10/ua1/", testFile3);
         uploadDataToServer3(testBaseDir3 + testDate + "15/ua1/", testFile4);
         uploadDataToServer3(testBaseDir3 + testDate + "20/ua1/", testFile4);
 
-
         uploadDataToServer3(testBaseDir3 + testDate + "00/ua3/", testFile1);
         uploadDataToServer3(testBaseDir3 + testDate + "05/ua3/", testFile2);
         uploadDataToServer3(testBaseDir3 + testDate + "10/ua3/", testFile3);
         uploadDataToServer3(testBaseDir3 + testDate + "15/ua3/", testFile4);
         uploadDataToServer3(testBaseDir3 + testDate + "20/ua3/", testFile4);
 
-
         //data for test normalTest_1s2t_pst where both source target partition are required
-
         uploadDataToServer3(testDirWithDateSourceTarget + "00/ua3/ua2/", testFile1);
         uploadDataToServer3(testDirWithDateSourceTarget + "05/ua3/ua2/", testFile2);
         uploadDataToServer3(testDirWithDateSourceTarget + "10/ua3/ua2/", testFile3);
@@ -156,22 +147,17 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         uploadDataToServer1(testDirWithDateSource1 + "00/ua2/", testFile1);
         uploadDataToServer1(testDirWithDateSource1 + "05/ua2/", testFile2);
 
-
         uploadDataToServer1(testDirWithDateSource1 + "00/ua1/", testFile1);
         uploadDataToServer1(testDirWithDateSource1 + "05/ua1/", testFile2);
 
-
         uploadDataToServer1(testDirWithDateSource1 + "00/ua3/", testFile1);
         uploadDataToServer1(testDirWithDateSource1 + "05/ua3/", testFile2);
-
         LOGGER.info("completed creating test data");
-
     }
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         Bundle bundle = BundleUtil.readFeedReplicationBundle();
-
         for (int i = 0; i < 3; i++) {
             bundles[i] = new Bundle(bundle, servers.get(i));
             bundles[i].generateUniqueBundle(this);
@@ -194,9 +180,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         // replication takes
         // place normally
         //partition is left blank
-
         Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
-
         String startTimeUA1 = "2012-10-01T12:05Z";
         String startTimeUA2 = "2012-10-01T12:10Z";
 
@@ -234,8 +218,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 
         ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
-        AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source "
-            + "is blank");
+        AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source is blank");
     }
 
 
@@ -290,42 +273,26 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         r = prism.getFeedHelper().schedule(feed.toString());
         AssertUtil.assertSucceeded(r);
         TimeUtil.sleepSeconds(15);
-
         HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "00/ua3/");
         HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "05/ua3/");
 
-        HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "00/ua3/",
-            testFile1);
-        HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/",
-            testFile2);
+        HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "00/ua3/", testFile1);
+        HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/", testFile2);
 
         InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
-        Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
-                "REPLICATION"), 1);
-        Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
-                "RETENTION"), 1);
-        Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
-                "RETENTION"), 1);
-        Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
-                "RETENTION"), 1);
-
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "RETENTION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "RETENTION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "RETENTION"), 1);
 
         //check if data has been replicated correctly
-
         //on ua1 only ua1 should be replicated, ua2 only ua2
         //number of files should be same as source
-
-
         List<Path> ua2ReplicatedData = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2));
         AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua2");
 
-
         List<Path> ua3ReplicatedData00 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua3/"));
         List<Path> ua3ReplicatedData05 = HadoopUtil
@@ -388,31 +355,18 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
 
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
-                "REPLICATION"), 1);
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
-                "RETENTION"), 1);
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
-                "RETENTION"), 1);
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
-                "RETENTION"), 1);
-
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "RETENTION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "RETENTION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "RETENTION"), 1);
 
         //check if data has been replicated correctly
-
         //on ua1 only ua1 should be replicated, ua2 only ua2
         //number of files should be same as source
-
-
         List<Path> ua2ReplicatedData =
             HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2));
         AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3");
 
-
         List<Path> ua3ReplicatedData00 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua2/"));
         List<Path> ua3ReplicatedData05 = HadoopUtil
@@ -439,7 +393,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         // (00 to 30)
         //data should be replicated to folder on cluster1 and cluster2 as targets
         //ua3 is the source and ua1 and ua2 are target
-
         Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
         String startTimeUA1 = "2012-10-01T12:05Z";
         String startTimeUA2 = "2012-10-01T12:10Z";
@@ -493,19 +446,16 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 
         //on ua1 only ua1 should be replicated, ua2 only ua2
         //number of files should be same as source
-
-
         List<Path> ua1ReplicatedData = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate));
+
         //check for no ua2 or ua3 in ua1
         AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2", "ua3");
 
         List<Path> ua2ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster2FS,
-                new Path(testBaseDir3 + testDate));
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate));
         AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3");
 
-
         List<Path> ua1ReplicatedData00 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate + "00/"));
         List<Path> ua1ReplicatedData10 = HadoopUtil
@@ -539,7 +489,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         //cluster2 is the target
         // Since there is no partition expression in source clusters, the feed submission should
         // fail (FALCON-305).
-
         Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
 
         String startTimeUA1 = "2012-10-01T12:05Z";
@@ -650,18 +599,15 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 
         //on ua1 only ua1 should be replicated, ua2 only ua2
         //number of files should be same as source
-
-
         List<Path> ua1ReplicatedData = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate));
+
         //check for no ua2 or ua3 in ua1
         AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2");
-
         List<Path> ua2ReplicatedData = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate));
         AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1");
 
-
         List<Path> ua1ReplicatedData05 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster1FS,
                 new Path(testBaseDir1 + "/ua1" + testDate + "05/"));
@@ -687,7 +633,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         AssertUtil.checkForListSizes(ua1ReplicatedData05, ua3OriginalData05ua1);
         AssertUtil.checkForListSizes(ua2ReplicatedData10, ua3OriginalData10ua2);
         AssertUtil.checkForListSizes(ua2ReplicatedData15, ua3OriginalData15ua2);
-
     }
 
 
@@ -702,7 +647,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         //data should be replicated to cluster2 from ua2 sub dir of cluster3 and cluster1
         // source cluster path in cluster1 should be mentioned in cluster definition
         // path for data in target cluster should also be customized
-
         Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
 
         String startTimeUA1 = "2012-10-01T12:00Z";
@@ -754,8 +698,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 
         //on ua1 only ua1 should be replicated, ua2 only ua2
         //number of files should be same as source
-
-
         List<Path> ua2ReplicatedData = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS,
             new Path(testBaseDir2 + "/replicated" + testDate));
         AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua2");
@@ -765,7 +707,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         List<Path> ua2ReplicatedData05ua3 = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS,
             new Path(testBaseDir2 + "/replicated" + testDate + "05/ua3/"));
 
-
         List<Path> ua1OriginalData00 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster1FS, new Path(
                 testBaseDirServer1Source + testDate + "00/ua1"));
@@ -776,11 +717,8 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         AssertUtil.checkForListSizes(ua2ReplicatedData05ua3, ua3OriginalData05);
     }
 
-
     @Test(enabled = true)
     public void normalTest1Source2TargetPartitionedSourceTarget() throws Exception {
-
-
         //this test is for ideal condition when data is present in all the required places and
         // replication takes
         // place normally
@@ -843,8 +781,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
 
         //on ua1 only ua1 should be replicated, ua2 only ua2
         //number of files should be same as source
-
-
         List<Path> ua1ReplicatedData = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate));
         //check for no ua2  in ua1
@@ -854,7 +790,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
             .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate));
         AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1");
 
-
         List<Path> ua1ReplicatedData05 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster1FS,
                 new Path(testBaseDir1 + "/ua1" + testDate + "05/"));
@@ -867,7 +802,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         List<Path> ua2ReplicatedData15 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate + "15"));
 
-
         List<Path> ua3OriginalData05ua1 = HadoopUtil
             .getAllFilesRecursivelyHDFS(cluster3FS, new Path(
                 testDirWithDateSourceTarget + "05/ua3/ua1"));


Mime
View raw message