falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rostafiyc...@apache.org
Subject [3/5] falcon git commit: FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
Date Tue, 14 Apr 2015 14:01:33 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index f299128..2f5dbd9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -101,7 +101,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -124,7 +124,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(10);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -148,10 +148,10 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
 
         //create data for first 5 instances, 6th should be non-materialized
-        String bundleId = InstanceUtil.getSequenceBundleID(clusterOC, processName,
+        String bundleId = OozieUtil.getSequenceBundleID(clusterOC, processName,
             EntityType.PROCESS, 0);
         for(CoordinatorJob c : clusterOC.getBundleJobInfo(bundleId).getCoordinators()) {
             List<CoordinatorAction> actions = clusterOC.getCoordJobInfo(c.getId()).getActions();
@@ -192,7 +192,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity(startTime, endTime);
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17);
         String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23);
@@ -215,7 +215,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:21Z");
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         String startTime = TimeUtil.getTimeWrtSystemTime(1);
         String endTime = TimeUtil.getTimeWrtSystemTime(40);
         InstancesResult r = prism.getProcessHelper()
@@ -236,7 +236,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -258,7 +258,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 10);
@@ -280,7 +280,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -302,7 +302,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
                 CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -322,7 +322,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -342,7 +342,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -362,7 +362,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
index c65461c..f65f9c9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -102,14 +102,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
             start + "&end=2010-01-02T01:16Z");
         InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
-        List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+        List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
             start + "&end=2010-01-02T01:11Z");
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
@@ -127,7 +127,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -151,7 +151,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -175,7 +175,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -200,14 +200,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         String process = bundles[0].getProcessData();
         LOGGER.info("process: " + Util.prettyPrintXml(process));
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         InstancesResult r = prism.getProcessHelper()
             .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z");
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
-        List<String> wfIDs =  InstanceUtil.getWorkflows(cluster, processName);
+        List<String> wfIDs =  InstanceUtil.getWorkflows(clusterOC, processName);
         prism.getProcessHelper().
             getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z");
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
@@ -225,14 +225,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         InstancesResult r = prism.getProcessHelper()
             .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z");
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
-        List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+        List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
             start + "&end=2010-01-02T01:11Z");
         TimeUtil.sleepSeconds(TIMEOUT);
@@ -250,13 +250,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         prism.getProcessHelper().getProcessInstanceKill(processName,
                 start + "&end=2010-01-02T01:01Z");
-        String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.KILLED).get(0);
+        String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.KILLED).get(0);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
                 start + "&end=2010-01-02T01:01Z");
         Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
@@ -275,11 +275,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
         String process = bundles[0].getProcessData();
-        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
-        String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.RUNNING,
+        String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.RUNNING,
             Status.SUCCEEDED).get(0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction
             .Status.SUCCEEDED, EntityType.PROCESS);
@@ -300,7 +300,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(2);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -308,7 +308,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
             start + "&end=2010-01-02T01:06Z");
         prism.getProcessHelper().getProcessInstanceRerun(processName,
             start + "&end=2010-01-02T01:06Z");
-        Assert.assertEquals(InstanceUtil.getInstanceStatus(cluster, processName, 0, 1),
+        Assert.assertEquals(InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 1),
                 CoordinatorAction.Status.SUSPENDED);
     }
 
@@ -323,11 +323,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-        List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+        List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
                 start + "&end=2010-01-02T01:11Z&force=true");
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
@@ -352,7 +352,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
                 CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
             start + "&end=2010-01-02T01:11Z");
-        s = InstanceUtil.getInstanceStatus(cluster, processName, 0, 0);
+        s = InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 0);
         Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
             "instance should have been in WAITING state");
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
index 3893ffe..a2ff993 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -95,7 +95,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceResumeOnlyEnd() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -118,7 +118,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceResumeResumeSome() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -142,7 +142,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceResumeResumeMany() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -166,7 +166,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
     public void testProcessInstanceResumeSingle() throws Exception {
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 2);
@@ -259,7 +259,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceResumeNonSuspended() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -280,7 +280,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceResumeLastInstance() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
index 6003ee0..ee1c5e4 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -100,7 +100,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
     public void getResumedProcessInstance() throws Exception {
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -123,7 +123,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
     public void getSuspendedProcessInstance() throws Exception {
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -142,7 +142,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void getRunningProcessInstance() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -183,9 +183,9 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void getSucceededProcessInstance() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
-        InstanceUtil.waitForBundleToReachState(cluster, processName, Job.Status.SUCCEEDED);
+        OozieUtil.waitForBundleToReachState(clusterOC, processName, Job.Status.SUCCEEDED);
         InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
         InstanceUtil.validateSuccessWOInstances(r);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index 635e238..8f177ec 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -120,11 +120,11 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS);
-        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
@@ -146,7 +146,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T05:00Z");
         AssertUtil.assertSucceeded(r);
@@ -164,7 +164,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS);
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
                 "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
         InstanceUtil.validateResponse(r, 5, 0, 0, 5, 0);
@@ -183,12 +183,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z");
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 5,
             Status.RUNNING, EntityType.PROCESS);
-        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
         InstanceUtil.validateResponse(r, 6, 5, 0, 1, 0);
         List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
@@ -206,12 +206,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceStatusStartAndEnd() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1 ,
                Status.RUNNING, EntityType.PROCESS);
-        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
@@ -232,12 +232,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             Status.RUNNING, EntityType.PROCESS, 5);
-        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T00:00Z&end=2010-01-02T01:21Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
@@ -288,7 +288,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceStatusReverseDateRange() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
         InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
             Status.RUNNING, EntityType.PROCESS);
@@ -311,12 +311,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(2);
         bundles[0].submitFeedsScheduleProcess(prism);
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
             Status.RUNNING, EntityType.PROCESS, 5);
-        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
         InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0);
@@ -337,7 +337,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
         String process = bundles[0].getProcessData();
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             Status.RUNNING, EntityType.PROCESS, 5);
@@ -364,12 +364,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceStatusOnlyStart() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
             Status.RUNNING, EntityType.PROCESS, 5);
-        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T01:00Z");
         InstanceUtil.validateResponse(r, 5, 1, 0, 4, 0);
@@ -404,12 +404,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(5);
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
         bundles[0].submitFeedsScheduleProcess(prism);
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             Status.RUNNING, EntityType.PROCESS, 5);
-        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
         InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0);
         List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index b7fed18..f673314 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -86,7 +86,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -111,7 +111,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
                 CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -131,7 +131,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -170,7 +170,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -249,7 +249,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index 40a4ad2..aef32bf 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -91,7 +91,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
         bundles[0].submitAndScheduleProcess();
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         TimeUtil.sleepSeconds(10);
-        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
 
         getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 1);
 
@@ -107,7 +107,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
             bundles[0].getProcessName(), EntityType.PROCESS);
         String bundleID = bundleList.get(0);
 
-        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+        OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
     }
 
     /**
@@ -131,7 +131,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
         bundles[0].submitAndScheduleProcess();
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         TimeUtil.sleepSeconds(10);
-        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
 
         getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, true, 1);
 
@@ -147,7 +147,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
             bundles[0].getProcessName(), EntityType.PROCESS);
         String bundleID = bundleList.get(0);
 
-        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+        OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
     }
 
     /**
@@ -175,7 +175,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
 
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         TimeUtil.sleepSeconds(10);
-        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
 
         getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 3);
 
@@ -191,7 +191,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
             bundles[0].getProcessName(), EntityType.PROCESS);
         String bundleID = bundleList.get(0);
 
-        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+        OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
     }
 
     /**
@@ -231,7 +231,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
 
         TimeUtil.sleepSeconds(10);
-        InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
 
         getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 7);
 
@@ -248,7 +248,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
             bundles[0].getProcessName(), EntityType.PROCESS);
         String bundleID = bundleList.get(0);
 
-        OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 0);
+        OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 0);
     }
 
     /*
@@ -271,10 +271,10 @@ public class ProcessLateRerunTest extends BaseTestClass {
             Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
             String bundleID = bundles.get(0);
             LOGGER.info("bundle id: " + bundleID);
-            List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID);
+            List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID);
             for (int i = 0; i < 10 && missingDependencies == null; ++i) {
                 TimeUtil.sleepSeconds(30);
-                missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID);
+                missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID);
             }
             Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
index 74903e1..3988ae9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
@@ -28,6 +28,7 @@ import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.OozieClient;
 import org.testng.Assert;
 import org.testng.annotations.*;
 
@@ -43,8 +44,8 @@ import java.util.Map;
 @Test(groups = "embedded")
 public class ProcessLibPathLoadTest extends BaseTestClass {
 
-
     private ColoHelper cluster = servers.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
     private FileSystem clusterFS = serverFS.get(0);
     private String testDir = cleanAndGetTestDir();
     private String aggregateWorkflowDir = testDir + "/aggregator";
@@ -100,9 +101,9 @@ public class ProcessLibPathLoadTest extends BaseTestClass {
     public void setRightJarInWorkflowLib() throws Exception {
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
-        InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
+        OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
     }
 
     /**
@@ -112,13 +113,13 @@ public class ProcessLibPathLoadTest extends BaseTestClass {
      * @throws Exception
      */
     @Test
-    public void setNoJarInWorkflowLibLocaltion() throws Exception {
+    public void setNoJarInWorkflowLibLocation() throws Exception {
         HadoopUtil.deleteDirIfExists(aggregateWorkflowDir + "/lib/" + oozieLibName, clusterFS);
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
-        InstanceUtil.waitForBundleToReachState(cluster, processName, Status.KILLED);
+        OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.KILLED);
     }
 
     /**
@@ -149,7 +150,6 @@ public class ProcessLibPathLoadTest extends BaseTestClass {
             output.write(buffer, 0, n);
         }
         output.close();
-
     }
 
     private static boolean isRedirected(Map<String, List<String>> header) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index bc2978f..dc2fc37 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.OozieClient;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -47,6 +48,7 @@ import java.util.List;
 public class ProcessLibPathTest extends BaseTestClass {
 
     private ColoHelper cluster = servers.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
     private FileSystem clusterFS = serverFS.get(0);
     private String testDir = cleanAndGetTestDir();
     private String testLibDir = testDir + "/TestLib";
@@ -102,9 +104,9 @@ public class ProcessLibPathTest extends BaseTestClass {
         bundles[0].setProcessWorkflow(workflowDir);
         LOGGER.info("processData: " + Util.prettyPrintXml(process));
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
-        InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
+        OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
     }
 
     /**
@@ -122,8 +124,8 @@ public class ProcessLibPathTest extends BaseTestClass {
         bundles[0].setProcessWorkflow(workflowDir);
         LOGGER.info("processData: " + Util.prettyPrintXml(process));
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
-        InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
+        OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
index 3057166..f67cbf8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
@@ -25,7 +25,6 @@ import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.response.ServiceResponse;
 import org.apache.falcon.regression.core.util.*;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
@@ -44,12 +43,11 @@ import javax.xml.bind.JAXBException;
 public class TouchAPIPrismAndServerTest extends BaseTestClass {
     private ColoHelper cluster = servers.get(0);
     private OozieClient clusterOC = serverOC.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
     private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator";
-    private String feed;
     private static final Logger LOGGER = Logger.getLogger(TouchAPIPrismAndServerTest.class);
     private String startTime;
     private String endTime;
+    private String clusterName;
 
     @BeforeClass(alwaysRun = true)
     public void uploadWorkflow() throws Exception {
@@ -68,6 +66,7 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
         bundles[0].setProcessValidity(startTime, endTime);
         bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
+        clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -86,22 +85,23 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
     public void touchProcessSchedule() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-        String oldbundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+        String coordId = OozieUtil.getLatestCoordinatorID(clusterOC,
+            bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(),
+            EntityType.PROCESS);
 
         // via prism
         ServiceResponse response = prism.getProcessHelper().touchEntity(bundles[0].getProcessData());
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
         validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
 
         // via server
         oldbundleId = bundleId;
-        coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        coordId = OozieUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         response = cluster.getProcessHelper().touchEntity(bundles[0].getProcessData());
-        bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
         validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
     }
@@ -117,27 +117,22 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
     public void touchFeedSchedule() throws Exception {
         bundles[0].submitAndScheduleFeed();
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-        String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC,
-                Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED);
-        String oldbundleId = InstanceUtil.getLatestBundleID(cluster,
-                Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED);
+        String coordId = OozieUtil.getLatestCoordinatorID(clusterOC, clusterName, EntityType.FEED);
+        String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED);
 
         // via prism
         TimeUtil.sleepSeconds(60);
         ServiceResponse response = prism.getFeedHelper().touchEntity(bundles[0].getDataSets().get(0));
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)),
-                EntityType.FEED);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED);
         Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
         validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
 
         // via server
         oldbundleId = bundleId;
-        coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, Util.readEntityName(bundles[0].getDataSets().get(0)),
-                EntityType.FEED);
+        coordId = OozieUtil.getLatestCoordinatorID(clusterOC, clusterName, EntityType.FEED);
         TimeUtil.sleepSeconds(60);
         response = cluster.getFeedHelper().touchEntity(bundles[0].getDataSets().get(0));
-        bundleId = InstanceUtil.getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)),
-                EntityType.FEED);
+        bundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED);
         Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
         validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
 
@@ -157,21 +152,20 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
         bundles[0].setProcessValidity(startTime, endTime);
         bundles[0].submitFeedsScheduleProcess(prism);
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-        String oldbundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+        String coordId = OozieUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
 
         // via prism
         ServiceResponse response = prism.getProcessHelper().touchEntity(bundles[0].getProcessData());
-        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertEquals(oldbundleId, bundleId, "New bundle generated");
         validate(response, "Old bundle id: " + coordId);
 
         //via server
         oldbundleId = bundleId;
         response = cluster.getProcessHelper().touchEntity(bundles[0].getProcessData());
-        bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertEquals(oldbundleId, bundleId, "New bundle generated");
         validate(response, "Old bundle id: " + coordId);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
index 9799a1c..67cbc52 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -32,8 +32,8 @@ import org.apache.falcon.regression.core.util.AssertUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.HCatUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hive.hcatalog.api.HCatClient;
 import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
@@ -212,9 +212,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
                 .build()).toString();
 
         AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
-        Assert.assertEquals(InstanceUtil
-                .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
-                        "REPLICATION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
         //This test doesn't wait for replication to succeed.
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
index 53e4777..cd1b538 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.HCatUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -113,7 +114,6 @@ public class HCatReplicationTest extends BaseTestClass {
         bundles[2].generateUniqueBundle(this);
         bundles[2].setClusterInterface(Interfacetype.REGISTRY, cluster3.getClusterHelper()
             .getHCatEndpoint());
-
     }
 
     @DataProvider
@@ -189,9 +189,7 @@ public class HCatReplicationTest extends BaseTestClass {
         AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
         TimeUtil.sleepSeconds(TIMEOUT);
         //check if all coordinators exist
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
-                "REPLICATION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
 
         //replication should start, wait while it ends
         // we will check for 2 instances so that both partitions are copied over.
@@ -206,7 +204,6 @@ public class HCatReplicationTest extends BaseTestClass {
             .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testHdfsDir));
         LOGGER.info("Data on target cluster: " + cluster2ReplicatedData);
         AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
     }
 
     // make sure oozie changes mentioned FALCON-389 are done on the clusters. Otherwise the test
@@ -285,14 +282,10 @@ public class HCatReplicationTest extends BaseTestClass {
         AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
         TimeUtil.sleepSeconds(TIMEOUT);
         //check if all coordinators exist
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
-                "REPLICATION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
 
         //check if all coordinators exist
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
-                "REPLICATION"), 1);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, Util.readEntityName(feed), "REPLICATION"), 1);
 
         //replication should start, wait while it ends
         // we will check for 2 instances so that both partitions are copied over.
@@ -318,7 +311,6 @@ public class HCatReplicationTest extends BaseTestClass {
         AssertUtil.checkForListSizes(srcData, cluster3TargetData);
     }
 
-
     private void addPartitionsToTable(List<String> partitions, List<String> partitionLocations,
                                       String partitionCol,
                                       String databaseName, String tableName, HCatClient hc) throws

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
index 7da8ef1..67b80d8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
@@ -66,7 +66,6 @@ import java.util.List;
 public class EntitySummaryTest extends BaseTestClass {
     private static final Logger LOGGER = Logger.getLogger(EntitySummaryTest.class);
     private ColoHelper cluster1 = servers.get(0);
-    private ColoHelper cluster2 = servers.get(1);
     private OozieClient cluster1OC = serverOC.get(0);
     private OozieClient cluster2OC = serverOC.get(1);
     private FileSystem cluster1FS = serverFS.get(0);
@@ -112,7 +111,7 @@ public class EntitySummaryTest extends BaseTestClass {
         bundles[0].submitClusters(prism);
         bundles[0].submitFeeds(prism);
         String clusterName = Util.readEntityName(bundles[0].getClusters().get(0));
-        List<String> processes = scheduleEntityValidateWaitingInstances(cluster1,
+        List<String> processes = scheduleEntityValidateWaitingInstances(cluster1OC,
             bundles[0].getProcessData(), EntityType.PROCESS, clusterName);
 
         //create data for processes to run and wait some time for instances to make progress
@@ -161,8 +160,7 @@ public class EntitySummaryTest extends BaseTestClass {
         AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster2Def));
 
         //submit and schedule 7 feeds, check that 7 waiting instances are present for each feed
-        List<String> feeds = scheduleEntityValidateWaitingInstances(cluster2, feed,
-            EntityType.FEED, clusterName);
+        List<String> feeds = scheduleEntityValidateWaitingInstances(cluster2OC, feed, EntityType.FEED, clusterName);
 
         //create data for processes to run and wait some time for instances to make progress
         List<String> folders = TimeUtil.getMinuteDatesOnEitherSide(TimeUtil.oozieDateToDate(
@@ -180,9 +178,8 @@ public class EntitySummaryTest extends BaseTestClass {
      * Schedules 7 entities and checks that summary reflects info about the most recent 7
      * instances of each of them.
      */
-    private List<String> scheduleEntityValidateWaitingInstances(ColoHelper cluster, String entity,
-                                                                EntityType entityType,
-                                                                String clusterName)
+    private List<String> scheduleEntityValidateWaitingInstances(OozieClient oozieClient, String entity,
+                                                                EntityType entityType, String clusterName)
         throws AuthenticationException, IOException, URISyntaxException, JAXBException,
         OozieClientException, InterruptedException {
         String entityName = Util.readEntityName(entity);
@@ -203,8 +200,8 @@ public class EntitySummaryTest extends BaseTestClass {
             }
             entity = entityMerlin.toString();
             AssertUtil.assertSucceeded(helper.submitAndSchedule(entity));
-            InstanceUtil.waitTillInstancesAreCreated(cluster, entity, 0);
-            InstanceUtil.waitTillInstanceReachState(cluster.getClusterHelper().getOozieClient(),
+            InstanceUtil.waitTillInstancesAreCreated(oozieClient, entity, 0);
+            InstanceUtil.waitTillInstanceReachState(oozieClient,
                 uniqueName, 7, CoordinatorAction.Status.WAITING, entityType);
 
             //check that summary shows recent (i) number of feeds and their instances

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
index 64a7e2e..6d41f9e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.response.lineage.VerticesResult;
 import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.GraphAssert;
 import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
 import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
@@ -37,6 +36,7 @@ import org.apache.falcon.resource.InstancesResult;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -55,6 +55,7 @@ public class LineageApiProcessInstanceTest extends BaseTestClass {
 
     private ColoHelper cluster = servers.get(0);
     private FileSystem clusterFS = serverFS.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
     private LineageHelper lineageHelper;
     private String baseTestHDFSDir = cleanAndGetTestDir();
     private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
@@ -104,7 +105,7 @@ public class LineageApiProcessInstanceTest extends BaseTestClass {
         outputFeedName = bundles[0].getOutputFeedNameFromBundle();
         Job.Status status = null;
         for (int i = 0; i < 20; i++) {
-            status = InstanceUtil.getDefaultCoordinatorStatus(cluster, bundles[0].getProcessName(), 0);
+            status = OozieUtil.getDefaultCoordinatorStatus(clusterOC, bundles[0].getProcessName(), 0);
             if (status == Job.Status.SUCCEEDED || status == Job.Status.KILLED) {
                 break;
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
index 148d6ea..8ef6bb6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
@@ -123,7 +123,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
 
         //submit and schedule feed
         AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
-        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed, 0);
         InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 12,
             CoordinatorAction.Status.WAITING, EntityType.FEED);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
index 66d1886..93e9a3e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
@@ -77,7 +77,7 @@ public class ListProcessInstancesTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitAndScheduleProcess();
         processName = bundles[0].getProcessName();
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         //create data for processes to run and wait some time for instances to make progress
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
index a72d339..99f58e6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
@@ -130,18 +130,17 @@ public class FeedDelayTest extends BaseTestClass {
         AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
 
         //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
-        Assert.assertEquals(InstanceUtil
-                .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), "REPLICATION"), 1);
+        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed, 0);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
 
         //Finding bundleId of replicated instance on target
-        String bundleId = InstanceUtil.getLatestBundleID(cluster2, Util.readEntityName(feed), EntityType.FEED);
+        String bundleId = OozieUtil.getLatestBundleID(cluster2OC, Util.readEntityName(feed), EntityType.FEED);
 
         //Finding startTime of replicated instance on target
-        String startTimeO0zie = OozieUtil.getCoordStartTime(cluster2, feed, 0);
+        String startTimeO0zie = OozieUtil.getCoordStartTime(cluster2OC, feed, 0);
         String startTimeExpected = getStartTime(sourceStartTime, targetStartTime, new Frequency(sourceDelay), flag);
 
-        List<String> missingDep = getAndCreateDependencies(cluster1, cluster1FS, cluster2, bundleId);
+        List<String> missingDep = getAndCreateDependencies(cluster1FS, cluster1.getPrefix(), cluster2OC, bundleId);
         List<String> qaDep = new ArrayList<String>();
 
         if (flag) {
@@ -176,25 +175,24 @@ public class FeedDelayTest extends BaseTestClass {
         };
     }
 
-    private List<String> getAndCreateDependencies(ColoHelper prismHelper1, FileSystem clusterFS1,
-        ColoHelper prismHelper2, String bundleId) throws OozieClientException, IOException {
-        List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
+    private List<String> getAndCreateDependencies(FileSystem sourceFS, String sourcePrefix, OozieClient targetOC,
+                                                  String bundleId) throws OozieClientException, IOException {
+        List<String> missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
         for (int i = 0; i < 10 && missingDependencies == null; ++i) {
             TimeUtil.sleepSeconds(30);
             LOGGER.info("sleeping...");
-            missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
+            missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
         }
         Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
 
         // Creating missing dependencies
-        HadoopUtil.createHDFSFolders(prismHelper1, missingDependencies);
+        HadoopUtil.createFolders(sourceFS, sourcePrefix, missingDependencies);
 
         //Adding data to empty folders
         for (String location : missingDependencies) {
             LOGGER.info("Transferring data to : " + location);
-            HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
+            HadoopUtil.copyDataToFolder(sourceFS, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
         }
-
         return missingDependencies;
     }
 


Mime
View raw message