falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rostafiyc...@apache.org
Subject [5/5] falcon git commit: FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
Date Tue, 14 Apr 2015 14:01:35 GMT
FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d0c9850e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d0c9850e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d0c9850e

Branch: refs/heads/master
Commit: d0c9850e5abf56b3e3c500ee744e96e634ba7691
Parents: 9d5429a
Author: Ruslan Ostafiychuk <rostafiychuk@apache.org>
Authored: Tue Apr 14 17:00:25 2015 +0300
Committer: Ruslan Ostafiychuk <rostafiychuk@apache.org>
Committed: Tue Apr 14 17:00:25 2015 +0300

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../regression/core/util/InstanceUtil.java      | 382 +++-----------
 .../falcon/regression/core/util/OozieUtil.java  | 332 +++++++++---
 .../falcon/regression/AuthorizationTest.java    |  86 ++--
 .../regression/ELExpCurrentAndLastWeekTest.java |  19 +-
 .../falcon/regression/ELValidationsTest.java    |  18 +-
 .../regression/EmbeddedPigScriptTest.java       |   3 +-
 .../falcon/regression/ExternalFSTest.java       |  11 +-
 .../regression/FeedClusterUpdateTest.java       | 510 +++++--------------
 .../regression/FeedInstanceListingTest.java     |  19 +-
 .../falcon/regression/FeedLateRerunTest.java    |  32 +-
 .../falcon/regression/FeedReplicationTest.java  |  28 +-
 .../regression/FeedSubmitAndScheduleTest.java   |   6 +-
 .../falcon/regression/InstanceParamTest.java    |   6 +-
 .../falcon/regression/InstanceSummaryTest.java  |   2 +-
 .../apache/falcon/regression/LogMoverTest.java  |   4 +-
 .../apache/falcon/regression/NewRetryTest.java  |  19 +-
 .../regression/ProcessInstanceKillsTest.java    |  26 +-
 .../regression/ProcessInstanceRerunTest.java    |  36 +-
 .../regression/ProcessInstanceResumeTest.java   |  12 +-
 .../regression/ProcessInstanceRunningTest.java  |  10 +-
 .../regression/ProcessInstanceStatusTest.java   |  50 +-
 .../regression/ProcessInstanceSuspendTest.java  |  10 +-
 .../falcon/regression/ProcessLateRerunTest.java |  20 +-
 .../regression/ProcessLibPathLoadTest.java      |  14 +-
 .../falcon/regression/ProcessLibPathTest.java   |  10 +-
 .../regression/TouchAPIPrismAndServerTest.java  |  46 +-
 .../regression/hcat/HCatFeedOperationsTest.java |   6 +-
 .../regression/hcat/HCatReplicationTest.java    |  16 +-
 .../regression/lineage/EntitySummaryTest.java   |  15 +-
 .../lineage/LineageApiProcessInstanceTest.java  |   5 +-
 .../lineage/ListFeedInstancesTest.java          |   2 +-
 .../lineage/ListProcessInstancesTest.java       |   2 +-
 .../falcon/regression/prism/FeedDelayTest.java  |  24 +-
 .../prism/NewPrismProcessUpdateTest.java        | 328 ++++++------
 .../regression/prism/OptionalInputTest.java     |   2 +-
 .../prism/PrismFeedLateReplicationTest.java     | 107 ++--
 .../PrismFeedReplicationPartitionExpTest.java   |  98 +---
 .../prism/PrismFeedReplicationUpdateTest.java   |  30 +-
 .../regression/prism/PrismFeedUpdateTest.java   |   6 +-
 .../prism/PrismProcessScheduleTest.java         |  10 +-
 .../RescheduleProcessInFinalStatesTest.java     |  40 +-
 .../falcon/regression/prism/RetentionTest.java  |   2 +-
 .../prism/UpdateAtSpecificTimeTest.java         | 150 +++---
 .../falcon/regression/ui/ProcessUITest.java     |   8 +-
 45 files changed, 1063 insertions(+), 1502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 891576c..492814b 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -63,6 +63,9 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+   FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java
+   (Paul Isaychuk via Ruslan Ostafiychuk)
+
    FALCON-1138: JDK requirement for merlin should be 1.7 (Raghav Kumar Gautam)
 
    FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
index 6c90256..723ea89 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
 import org.apache.falcon.request.BaseRequest;
 import org.apache.falcon.resource.APIResult;
@@ -41,7 +40,6 @@ import org.apache.log4j.Logger;
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.Job;
 import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
@@ -58,29 +56,26 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
 /**
  * util functions related to instanceTest.
  */
 public final class InstanceUtil {
 
-    private InstanceUtil() {
-        throw new AssertionError("Instantiating utility class...");
-    }
-
+    public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10;
     private static final Logger LOGGER = Logger.getLogger(InstanceUtil.class);
     private static final EnumSet<Status> RUNNING_PREP_SUCCEEDED = EnumSet.of(Status.RUNNING,
         Status.PREP, Status.SUCCEEDED);
 
+    private InstanceUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
     public static APIResult sendRequestProcessInstance(String url, String user)
         throws IOException, URISyntaxException, AuthenticationException, InterruptedException {
         return hitUrl(url, Util.getMethodType(url), user);
     }
 
-    public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10;
-
     public static APIResult hitUrl(String url,
             String method, String user) throws URISyntaxException,
             IOException, AuthenticationException, InterruptedException {
@@ -169,11 +164,20 @@ public final class InstanceUtil {
         return Collections.frequency(statuses, workflowStatus);
     }
 
+    /**
+     * Validates that response doesn't contains instances.
+     * @param r response
+     */
     public static void validateSuccessWOInstances(InstancesResult r) {
         AssertUtil.assertSucceeded(r);
         Assert.assertNull(r.getInstances(), "Unexpected :" + Arrays.toString(r.getInstances()));
     }
 
+    /**
+     * Validates that failed response contains specific error message.
+     * @param instancesResult response
+     * @param error expected error
+     */
     public static void validateError(InstancesResult instancesResult, ResponseErrors error) {
         Assert.assertTrue(instancesResult.getMessage().contains(error.getError()),
             "Error should contains '" + error.getError() + "'");
@@ -205,7 +209,6 @@ public final class InstanceUtil {
             LOGGER.info("status: " + status + ", instance: " + instance.getInstance());
             statuses.add(status);
         }
-
         Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.RUNNING),
             runningCount, "Running Instances");
         Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.SUSPENDED),
@@ -216,22 +219,27 @@ public final class InstanceUtil {
             killedCount, "Killed Instances");
     }
 
+    /**
+     * Retrieves workflow IDs from every instances from response.
+     * @param instancesResult response
+     * @return list of workflow IDs
+     */
     public static List<String> getWorkflowJobIds(InstancesResult instancesResult) {
         InstancesResult.Instance[] instances = instancesResult.getInstances();
-        LOGGER.info("instances: " + Arrays.toString(instances));
-        Assert.assertNotNull(instances, "instances should be not null");
-        List<String> wfids = new ArrayList<String>();
+        LOGGER.info("Instances: " + Arrays.toString(instances));
+        Assert.assertNotNull(instances, "Instances should be not null");
+        List<String> wfIds = new ArrayList<String>();
         for (InstancesResult.Instance instance : instances) {
-            LOGGER.warn("instance: " + instance + " , status: "
-                    + instance.getStatus() +  ", logs : " + instance.getLogFile());
+            LOGGER.warn(String.format(
+                "instance: %s, status: %s, logs : %s", instance, instance.getStatus(), instance.getLogFile()));
             if (instance.getStatus().name().equals("RUNNING") || instance.getStatus().name().equals("SUCCEEDED")) {
-                wfids.add(instance.getLogFile());
+                wfIds.add(instance.getLogFile());
             }
             if (instance.getStatus().name().equals("KILLED") || instance.getStatus().name().equals("WAITING")) {
                 Assert.assertNull(instance.getLogFile());
             }
         }
-        return wfids;
+        return wfIds;
     }
 
     /**
@@ -250,21 +258,27 @@ public final class InstanceUtil {
             }
         }
         Assert.assertEquals(counter, failCount, "Actual number of failed instances does not "
-                + "match expected number of failed instances.");
+            + "match to expected number of failed instances.");
     }
 
-    public static List<String> getWorkflows(ColoHelper prismHelper, String processName,
+    /**
+     * Gets process workflows by given statuses.
+     * @param oozieClient oozie client of cluster where process is running
+     * @param processName process name
+     * @param statuses statuses workflows will be selected by
+     * @return list of matching workflows
+     * @throws OozieClientException
+     */
+    public static List<String> getWorkflows(OozieClient oozieClient, String processName,
             WorkflowJob.Status... statuses) throws OozieClientException {
-        OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
         String bundleID = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS).get(0);
-        List<String> workflowJobIds = OozieUtil.getWorkflowJobs(prismHelper, bundleID);
+        List<String> workflowJobIds = OozieUtil.getWorkflowJobs(oozieClient, bundleID);
 
         List<String> toBeReturned = new ArrayList<String>();
         for (String jobId : workflowJobIds) {
             WorkflowJob wfJob = oozieClient.getJobInfo(jobId);
             LOGGER.info("wfJob.getId(): " + wfJob.getId() + " wfJob.getStartTime(): "
-                + wfJob.getStartTime()
-                + "jobId: " + jobId + "  wfJob.getStatus(): " + wfJob.getStatus());
+                + wfJob.getStartTime() + "jobId: " + jobId + "  wfJob.getStatus(): " + wfJob.getStatus());
             if (statuses.length == 0 || Arrays.asList(statuses).contains(wfJob.getStatus())) {
                 toBeReturned.add(jobId);
             }
@@ -304,44 +318,17 @@ public final class InstanceUtil {
         }
     }
 
-    public static List<CoordinatorAction> getProcessInstanceList(ColoHelper coloHelper,
+    public static List<CoordinatorAction> getProcessInstanceList(OozieClient oozieClient,
             String processName, EntityType entityType) throws OozieClientException {
-        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
-        String coordId = getLatestCoordinatorID(oozieClient, processName, entityType);
+        String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType);
         //String coordId = getDefaultCoordinatorFromProcessName(processName);
         LOGGER.info("default coordID: " + coordId);
         return oozieClient.getCoordJobInfo(coordId).getActions();
     }
 
-    public static String getLatestCoordinatorID(OozieClient oozieClient, String processName,
-            EntityType entityType) throws OozieClientException {
-        final String latestBundleID = getLatestBundleID(oozieClient, processName, entityType);
-        return getDefaultCoordIDFromBundle(oozieClient, latestBundleID);
-    }
-
-    public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId)
-        throws OozieClientException {
-        OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId);
-        BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId);
-        List<CoordinatorJob> coords = bundleInfo.getCoordinators();
-        int min = 100000;
-        String minString = "";
-        for (CoordinatorJob coord : coords) {
-            String strID = coord.getId();
-            if (min > Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
-                min = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
-                minString = coord.getId();
-            }
-        }
-        LOGGER.info("function getDefaultCoordIDFromBundle: minString: " + minString);
-        return minString;
-    }
-
-    public static int getInstanceCountWithStatus(ColoHelper coloHelper, String processName,
-            org.apache.oozie.client.CoordinatorAction.Status status, EntityType entityType)
-        throws OozieClientException {
-        List<CoordinatorAction> coordActions = getProcessInstanceList(coloHelper, processName,
-            entityType);
+    public static int getInstanceCountWithStatus(OozieClient oozieClient, String processName,
+            CoordinatorAction.Status status, EntityType entityType) throws OozieClientException {
+        List<CoordinatorAction> coordActions = getProcessInstanceList(oozieClient, processName, entityType);
         List<CoordinatorAction.Status> statuses = new ArrayList<CoordinatorAction.Status>();
         for (CoordinatorAction action : coordActions) {
             statuses.add(action.getStatus());
@@ -349,109 +336,10 @@ public final class InstanceUtil {
         return Collections.frequency(statuses, status);
     }
 
-    public static Status getDefaultCoordinatorStatus(ColoHelper colohelper, String processName,
-            int bundleNumber) throws OozieClientException {
-        OozieClient oozieClient = colohelper.getProcessHelper().getOozieClient();
-        String bundleID =
-                getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
-        String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID);
-        return oozieClient.getCoordJobInfo(coordId).getStatus();
-    }
-
-    /**
-     * Retrieves all coordinators of bundle.
-     *
-     * @param oozieClient Oozie client to use for fetching info.
-     * @param bundleID specific bundle ID
-     * @return list of bundle coordinators
-     * @throws OozieClientException
-     */
-    public static List<CoordinatorJob> getBundleCoordinators(OozieClient oozieClient,
-            String bundleID) throws OozieClientException {
-        BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleID);
-        return bundleInfo.getCoordinators();
-    }
-
-    /**
-     * Retrieves the latest bundle ID.
-     *
-     * @param coloHelper colo helper of cluster job is running on
-     * @param entityName name of entity job is related to
-     * @param entityType type of entity - feed or process expected
-     * @return latest bundle ID
-     * @throws OozieClientException
-     */
-    public static String getLatestBundleID(ColoHelper coloHelper,
-            String entityName, EntityType entityType)
-        throws OozieClientException {
-        final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient();
-        return getLatestBundleID(oozieClient, entityName, entityType);
-    }
-
-    /**
-     * Retrieves the latest bundle ID.
-     *
-     * @param oozieClient where job is running
-     * @param entityName  name of entity job is related to
-     * @param entityType  type of entity - feed or process expected
-     * @return latest bundle ID
-     * @throws OozieClientException
-     */
-    public static String getLatestBundleID(OozieClient oozieClient,
-            String entityName, EntityType entityType) throws OozieClientException {
-        List<String> bundleIds = OozieUtil.getBundles(oozieClient, entityName, entityType);
-        String max = "0";
-        int maxID = -1;
-        for (String strID : bundleIds) {
-            if (maxID < Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
-                maxID = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
-                max = strID;
-            }
-        }
-        return max;
-    }
-
-    /**
-     * Retrieves ID of bundle related to some process/feed using its ordinal number.
-     *
-     * @param entityName   - name of entity bundle is related to
-     * @param entityType   - feed or process
-     * @param bundleNumber - ordinal number of bundle
-     * @return bundle ID
-     * @throws OozieClientException
-     */
-    public static String getSequenceBundleID(OozieClient oozieClient, String entityName,
-            EntityType entityType, int bundleNumber)
-        throws OozieClientException {
-
-        //sequence start from 0
-        List<String> bundleIds = OozieUtil.getBundles(oozieClient,
-                entityName, entityType);
-        Map<Integer, String> bundleMap = new TreeMap<Integer, String>();
-        String bundleID;
-        for (String strID : bundleIds) {
-            LOGGER.info("getSequenceBundleID: " + strID);
-            int key = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
-            bundleMap.put(key, strID);
-        }
-        for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
-            LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue());
-        }
-        int i = 0;
-        for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
-            bundleID = entry.getValue();
-            if (i == bundleNumber) {
-                return bundleID;
-            }
-            i++;
-        }
-        return null;
-    }
-
     /**
      * Retrieves status of one instance.
      *
-     * @param coloHelper     - server from which instance status will be retrieved.
+     * @param oozieClient     - server from which instance status will be retrieved.
      * @param processName    - name of process which mentioned instance belongs to.
      * @param bundleNumber   - ordinal number of one of the bundle which are related to that
      *                         process.
@@ -459,17 +347,13 @@ public final class InstanceUtil {
      * @return - state of mentioned instance.
      * @throws OozieClientException
      */
-    public static CoordinatorAction.Status getInstanceStatus(ColoHelper coloHelper,
-            String processName,
-            int bundleNumber, int
-            instanceNumber) throws OozieClientException {
-        final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient();
-        String bundleID =
-            getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
+    public static CoordinatorAction.Status getInstanceStatus(OozieClient oozieClient, String processName,
+            int bundleNumber, int instanceNumber) throws OozieClientException {
+        String bundleID = OozieUtil.getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
         if (StringUtils.isEmpty(bundleID)) {
             return null;
         }
-        String coordID = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
+        String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
         if (StringUtils.isEmpty(coordID)) {
             return null;
         }
@@ -487,22 +371,6 @@ public final class InstanceUtil {
     }
 
     /**
-     * Retrieves replication coordinatorID from bundle of coordinators.
-     */
-    public static List<String> getReplicationCoordID(String bundleId, AbstractEntityHelper helper)
-        throws OozieClientException {
-        final OozieClient oozieClient = helper.getOozieClient();
-        List<CoordinatorJob> coords = InstanceUtil.getBundleCoordinators(oozieClient, bundleId);
-        List<String> replicationCoordID = new ArrayList<String>();
-        for (CoordinatorJob coord : coords) {
-            if (coord.getAppName().contains("FEED_REPLICATION")) {
-                replicationCoordID.add(coord.getId());
-            }
-        }
-        return replicationCoordID;
-    }
-
-    /**
      * Forms and sends process instance request based on url of action to be performed and it's
      * parameters.
      *
@@ -510,8 +378,7 @@ public final class InstanceUtil {
      * @param user - whose credentials will be used for this action
      * @return result from API
      */
-    public static APIResult createAndSendRequestProcessInstance(
-            String url, String params, String colo, String user)
+    public static APIResult createAndSendRequestProcessInstance(String url, String params, String colo, String user)
         throws IOException, URISyntaxException, AuthenticationException, InterruptedException {
         if (params != null && !colo.equals("")) {
             url = url + params + "&" + colo.substring(1);
@@ -520,12 +387,11 @@ public final class InstanceUtil {
         } else {
             url = url + colo;
         }
-        return InstanceUtil.sendRequestProcessInstance(url, user);
+        return sendRequestProcessInstance(url, user);
     }
 
     public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord(
-            ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
-        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+            OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException {
         CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
         String jobId = coordInfo.getActions().get(instanceNumber).getExternalId();
         LOGGER.info("jobId = " + jobId);
@@ -537,15 +403,14 @@ public final class InstanceUtil {
     }
 
     public static List<String> getInputFoldersForInstanceForReplication(
-            ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
-        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+            OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException {
         CoordinatorAction x = oozieClient.getCoordActionInfo(coordID + "@" + instanceNumber);
         String jobId = x.getExternalId();
         WorkflowJob wfJob = oozieClient.getJobInfo(jobId);
-        return InstanceUtil.getReplicationFolderFromInstanceRunConf(wfJob.getConf());
+        return getReplicationFolderFromInstanceRunConf(wfJob.getConf());
     }
 
-    public static List<String> getReplicationFolderFromInstanceRunConf(String runConf) {
+    private static List<String> getReplicationFolderFromInstanceRunConf(String runConf) {
         String conf;
         conf = runConf.substring(runConf.indexOf("falconInPaths</name>") + 20);
         conf = conf.substring(conf.indexOf("<value>") + 7);
@@ -553,13 +418,10 @@ public final class InstanceUtil {
         return new ArrayList<String>(Arrays.asList(conf.split(",")));
     }
 
-    public static int getInstanceRunIdFromCoord(ColoHelper colo, String coordID, int instanceNumber)
+    public static int getInstanceRunIdFromCoord(OozieClient oozieClient, String coordID, int instanceNumber)
         throws OozieClientException {
-        OozieClient oozieClient = colo.getProcessHelper().getOozieClient();
         CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
-
-        WorkflowJob actionInfo =
-                oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId());
+        WorkflowJob actionInfo = oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId());
         return actionInfo.getRun();
     }
 
@@ -575,11 +437,11 @@ public final class InstanceUtil {
         List<String> bundleIds = OozieUtil.getBundles(oozieClient, feedName, EntityType.FEED);
         LOGGER.info("bundleIds: " + bundleIds);
 
-        for (String aBundleId : bundleIds) {
-            LOGGER.info("aBundleId: " + aBundleId);
-            OozieUtil.waitForCoordinatorJobCreation(oozieClient, aBundleId);
+        for (String bundleId : bundleIds) {
+            LOGGER.info("bundleId: " + bundleId);
+            OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId);
             List<CoordinatorJob> coords =
-                    InstanceUtil.getBundleCoordinators(oozieClient, aBundleId);
+                    OozieUtil.getBundleCoordinators(oozieClient, bundleId);
             LOGGER.info("coords: " + coords);
             for (CoordinatorJob coord : coords) {
                 if (coord.getAppName().contains(coordType)) {
@@ -591,9 +453,8 @@ public final class InstanceUtil {
     }
 
     public static List<CoordinatorAction> getProcessInstanceListFromAllBundles(
-            ColoHelper coloHelper, String processName, EntityType entityType)
+            OozieClient oozieClient, String processName, EntityType entityType)
         throws OozieClientException {
-        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
         List<CoordinatorAction> list = new ArrayList<CoordinatorAction>();
         final List<String> bundleIds = OozieUtil.getBundles(oozieClient, processName, entityType);
         LOGGER.info("bundle size for process is " + bundleIds.size());
@@ -609,37 +470,31 @@ public final class InstanceUtil {
                 list.addAll(actions);
             }
         }
-        String coordId = getLatestCoordinatorID(oozieClient, processName, entityType);
+        String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType);
         LOGGER.info("default coordID: " + coordId);
         return list;
     }
 
-    public static String getOutputFolderForInstanceForReplication(ColoHelper coloHelper,
+    public static String getOutputFolderForInstanceForReplication(OozieClient oozieClient,
             String coordID, int instanceNumber) throws OozieClientException {
-        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
         CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
         final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber);
         final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf();
-        return InstanceUtil.getReplicatedFolderFromInstanceRunConf(actionConf);
+        return getReplicatedFolderFromInstanceRunConf(actionConf);
     }
 
-    private static String getReplicatedFolderFromInstanceRunConf(
-            String runConf) {
-        String inputPathExample =
-                InstanceUtil.getReplicationFolderFromInstanceRunConf(runConf).get(0);
-        String postFix = inputPathExample
-                .substring(inputPathExample.length() - 7, inputPathExample.length());
+    private static String getReplicatedFolderFromInstanceRunConf(String runConf) {
+        String inputPathExample = getReplicationFolderFromInstanceRunConf(runConf).get(0);
+        String postFix = inputPathExample.substring(inputPathExample.length() - 7, inputPathExample.length());
         return getReplicatedFolderBaseFromInstanceRunConf(runConf) + postFix;
     }
 
     public static String getOutputFolderBaseForInstanceForReplication(
-            ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
-        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+            OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException {
         CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
-
         final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber);
         final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf();
-        return InstanceUtil.getReplicatedFolderBaseFromInstanceRunConf(actionConf);
+        return getReplicatedFolderBaseFromInstanceRunConf(actionConf);
     }
 
     private static String getReplicatedFolderBaseFromInstanceRunConf(String runConf) {
@@ -661,10 +516,8 @@ public final class InstanceUtil {
      * @param totalMinutesToWait time in minutes for which instance state should be polled
      * @throws OozieClientException
      */
-    public static void waitTillInstanceReachState(OozieClient client, String entityName,
-            int instancesNumber,
-            CoordinatorAction.Status expectedStatus,
-            EntityType entityType, int totalMinutesToWait)
+    public static void waitTillInstanceReachState(OozieClient client, String entityName, int instancesNumber,
+            CoordinatorAction.Status expectedStatus, EntityType entityType, int totalMinutesToWait)
         throws OozieClientException {
         String filter;
         // get the bundle ids
@@ -682,7 +535,7 @@ public final class InstanceUtil {
             TimeUtil.sleepSeconds(5);
         }
         if (bundleJobs.size() == 0) {
-            Assert.assertTrue(false, "Could not retrieve bundles");
+            Assert.fail("Could not retrieve bundles");
         }
         List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs);
         String bundleId = OozieUtil.getMaxId(bundleIds);
@@ -735,7 +588,7 @@ public final class InstanceUtil {
                 TimeUtil.sleepSeconds(sleepTime);
             }
         }
-        Assert.assertTrue(false, "expected state of instance was never reached");
+        Assert.fail("expected state of instance was never reached");
     }
 
     /**
@@ -759,54 +612,6 @@ public final class InstanceUtil {
     }
 
     /**
-     * Waits till bundle job will reach expected status.
-     * Generates time according to expected status.
-     *
-     * @param coloHelper     colo helper of cluster job is running on
-     * @param processName    name of process which job is being analyzed
-     * @param expectedStatus job status we are waiting for
-     * @throws OozieClientException
-     */
-    public static void waitForBundleToReachState(ColoHelper coloHelper,
-            String processName, Job.Status expectedStatus) throws
-            OozieClientException {
-        int totalMinutesToWait = getMinutesToWait(expectedStatus);
-        waitForBundleToReachState(coloHelper, processName, expectedStatus, totalMinutesToWait);
-    }
-
-    /**
-     * Waits till bundle job will reach expected status during specific time.
-     * Use it directly in test cases when timeouts are different from trivial, in other cases use
-     * waitForBundleToReachState(ColoHelper, String, Status)
-     *
-     * @param coloHelper         colo helper of cluster job is running on
-     * @param processName        name of process which job is being analyzed
-     * @param expectedStatus     job status we are waiting for
-     * @param totalMinutesToWait specific time to wait expected state
-     * @throws OozieClientException
-     */
-    public static void waitForBundleToReachState(ColoHelper coloHelper,
-            String processName, Job.Status expectedStatus, int totalMinutesToWait) throws
-            OozieClientException {
-
-        int sleep = totalMinutesToWait * 60 / 20;
-        for (int sleepCount = 0; sleepCount < sleep; sleepCount++) {
-            String bundleID =
-                    InstanceUtil.getLatestBundleID(coloHelper, processName, EntityType.PROCESS);
-            OozieClient oozieClient =
-                    coloHelper.getProcessHelper().getOozieClient();
-            BundleJob j = oozieClient.getBundleJobInfo(bundleID);
-            LOGGER.info(sleepCount + ". Current status: " + j.getStatus()
-                + "; expected: " + expectedStatus);
-            if (j.getStatus() == expectedStatus) {
-                return;
-            }
-            TimeUtil.sleepSeconds(20);
-        }
-        Assert.fail("State " + expectedStatus + " wasn't reached in " + totalMinutesToWait + " mins");
-    }
-
-    /**
      * Generates time which is presumably needed for process/feed instances to reach particular
      * state.
      * Feed instances are running faster then process, so feed timeouts are less then process.
@@ -815,8 +620,7 @@ public final class InstanceUtil {
      * @param expectedStatus expected status we are waiting for
      * @return minutes to wait for expected status
      */
-    private static int getMinutesToWait(EntityType entityType,
-            CoordinatorAction.Status expectedStatus) {
+    private static int getMinutesToWait(EntityType entityType, CoordinatorAction.Status expectedStatus) {
         switch (expectedStatus) {
         case RUNNING:
             if (entityType == EntityType.PROCESS) {
@@ -841,43 +645,22 @@ public final class InstanceUtil {
     }
 
     /**
-     * Generates time which is presumably needed for bundle job to reach particular state.
-     *
-     * @param expectedStatus status which we are expect to get from bundle job
-     * @return minutes to wait for expected status
-     */
-    private static int getMinutesToWait(Job.Status expectedStatus) {
-        switch (expectedStatus) {
-        case DONEWITHERROR:
-        case SUCCEEDED:
-            return OSUtil.IS_WINDOWS ? 40 : 20;
-        case KILLED:
-            return OSUtil.IS_WINDOWS ? 30 : 15;
-        default:
-            return OSUtil.IS_WINDOWS ? 60 : 30;
-        }
-    }
-
-    /**
      * Waits till instances of specific job will be created during specific time.
      * Use this method directly in unusual test cases where timeouts are different from trivial.
-     * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int)
+     * In other cases use waitTillInstancesAreCreated(OozieClient,String,int)
      *
      * @param oozieClient oozie client of the cluster on which job is running
      * @param entity      definition of entity which describes job
      * @param bundleSeqNo bundle number if update has happened.
      * @throws OozieClientException
      */
-    public static void waitTillInstancesAreCreated(OozieClient oozieClient,
-            String entity,
-            int bundleSeqNo,
-            int totalMinutesToWait
-    ) throws OozieClientException {
+    public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo,
+            int totalMinutesToWait) throws OozieClientException {
         String entityName = Util.readEntityName(entity);
         EntityType type = Util.getEntityType(entity);
-        String bundleID = getSequenceBundleID(oozieClient, entityName,
+        String bundleID = OozieUtil.getSequenceBundleID(oozieClient, entityName,
             type, bundleSeqNo);
-        String coordID = getDefaultCoordIDFromBundle(oozieClient, bundleID);
+        String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
         for (int sleepCount = 0; sleepCount < totalMinutesToWait; sleepCount++) {
             CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
 
@@ -894,17 +677,14 @@ public final class InstanceUtil {
      * Waits till instances of specific job will be created during timeout.
      * Timeout is common for most of usual test cases.
      *
-     * @param coloHelper  colo helper of cluster job is running on
+     * @param oozieClient  oozieClient of cluster job is running on
      * @param entity      definition of entity which describes job
      * @param bundleSeqNo bundle number if update has happened.
      * @throws OozieClientException
      */
-    public static void waitTillInstancesAreCreated(ColoHelper coloHelper,
-            String entity,
-            int bundleSeqNo
+    public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo
     ) throws OozieClientException {
         int sleep = INSTANCES_CREATED_TIMEOUT * 60 / 5;
-        final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient();
         waitTillInstancesAreCreated(oozieClient, entity, bundleSeqNo, sleep);
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index baa69c7..138b45f 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -20,6 +20,7 @@ package org.apache.falcon.regression.core.util;
 
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
 import org.apache.oozie.client.AuthOozieClient;
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
@@ -237,11 +238,10 @@ public final class OozieUtil {
     }
 
 
-    public static List<String> getMissingDependencies(ColoHelper helper, String bundleID)
+    public static List<String> getMissingDependencies(OozieClient oozieClient, String bundleID)
         throws OozieClientException {
         CoordinatorJob jobInfo;
         jobInfo = null;
-        OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
         BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID);
         List<CoordinatorJob> coordinatorJobList = bundleJob.getCoordinators();
         if (coordinatorJobList.size() > 1) {
@@ -285,9 +285,8 @@ public final class OozieUtil {
         return new ArrayList<String>(Arrays.asList(missingDependencies));
     }
 
-    public static List<String> getWorkflowJobs(ColoHelper prismHelper, String bundleID)
+    public static List<String> getWorkflowJobs(OozieClient oozieClient, String bundleID)
         throws OozieClientException {
-        OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
         waitForCoordinatorJobCreation(oozieClient, bundleID);
         List<String> workflowIds = new ArrayList<String>();
         List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
@@ -299,12 +298,11 @@ public final class OozieUtil {
         return workflowIds;
     }
 
-    public static List<String> getWorkflow(ColoHelper coloHelper, String bundleID)
+    public static List<String> getWorkflow(OozieClient oozieClient, String bundleID)
         throws OozieClientException {
-        OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient();
         waitForCoordinatorJobCreation(oozieClient, bundleID);
         List<String> workflowIds = new ArrayList<String>();
-        String coordId = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
+        String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID);
         CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordId);
         for (CoordinatorAction action : coordJobInfo.getActions()) {
             if (action.getStatus().name().equals("RUNNING") || action.getStatus().name().equals("SUCCEEDED")) {
@@ -317,59 +315,51 @@ public final class OozieUtil {
         return workflowIds;
     }
 
-    public static Date getNominalTime(ColoHelper prismHelper, String bundleID)
+    public static Date getNominalTime(OozieClient oozieClient, String bundleID)
         throws OozieClientException {
-        OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
         BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID);
         CoordinatorJob jobInfo =
             oozieClient.getCoordJobInfo(bundleJob.getCoordinators().get(0).getId());
         List<CoordinatorAction> actions = jobInfo.getActions();
-
         return actions.get(0).getNominalTime();
-
     }
 
-    public static CoordinatorJob getDefaultOozieCoord(ColoHelper prismHelper, String bundleId,
+    public static CoordinatorJob getDefaultOozieCoord(OozieClient oozieClient, String bundleId,
                                                       EntityType type)
         throws OozieClientException {
-        OozieClient client = prismHelper.getClusterHelper().getOozieClient();
-        BundleJob bundlejob = client.getBundleJobInfo(bundleId);
-
+        BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId);
         for (CoordinatorJob coord : bundlejob.getCoordinators()) {
             if ((coord.getAppName().contains("DEFAULT") && EntityType.PROCESS == type)
                     ||
                 (coord.getAppName().contains("REPLICATION") && EntityType.FEED == type)) {
-                return client.getCoordJobInfo(coord.getId());
+                return oozieClient.getCoordJobInfo(coord.getId());
             } else {
-                LOGGER.info("Desired coord does not exists on " + client.getOozieUrl());
+                LOGGER.info("Desired coord does not exists on " + oozieClient.getOozieUrl());
             }
         }
-
         return null;
     }
 
-    public static int getNumberOfWorkflowInstances(ColoHelper prismHelper, String bundleId)
+    public static int getNumberOfWorkflowInstances(OozieClient oozieClient, String bundleId)
         throws OozieClientException {
-        return getDefaultOozieCoord(prismHelper, bundleId,
-            EntityType.PROCESS).getActions().size();
+        return getDefaultOozieCoord(oozieClient, bundleId, EntityType.PROCESS).getActions().size();
     }
 
-    public static List<String> getActionsNominalTime(ColoHelper prismHelper,
-                                                     String bundleId,
-                                                     EntityType type)
+    public static List<String> getActionsNominalTime(OozieClient oozieClient,
+                                                     String bundleId, EntityType type)
         throws OozieClientException {
-        Map<Date, CoordinatorAction.Status> actions = getActionsNominalTimeAndStatus(prismHelper, bundleId, type);
+        Map<Date, CoordinatorAction.Status> actions = getActionsNominalTimeAndStatus(oozieClient, bundleId, type);
         List<String> nominalTime = new ArrayList<String>();
         for (Date date : actions.keySet()) {
             nominalTime.add(date.toString());
         }
         return nominalTime;
     }
-    public static Map<Date, CoordinatorAction.Status> getActionsNominalTimeAndStatus(ColoHelper prismHelper,
+
+    public static Map<Date, CoordinatorAction.Status> getActionsNominalTimeAndStatus(OozieClient oozieClient,
             String bundleId, EntityType type) throws OozieClientException {
         Map<Date, CoordinatorAction.Status> result = new TreeMap<Date, CoordinatorAction.Status>();
-        List<CoordinatorAction> actions = getDefaultOozieCoord(prismHelper,
-                bundleId, type).getActions();
+        List<CoordinatorAction> actions = getDefaultOozieCoord(oozieClient, bundleId, type).getActions();
         for (CoordinatorAction action : actions) {
             result.put(action.getNominalTime(), action.getStatus());
         }
@@ -386,79 +376,54 @@ public final class OozieUtil {
             BundleJob.Status.SUCCEEDED, BundleJob.Status.KILLED).contains(bundleJob.getStatus())) {
             return true;
         }
-
         TimeUtil.sleepSeconds(20);
         return false;
     }
 
-    public static void verifyNewBundleCreation(ColoHelper cluster,
-                                               String originalBundleId,
-                                               List<String>
-                                                   initialNominalTimes,
-                                               String entity,
-                                               boolean shouldBeCreated,
-
-                                               boolean matchInstances) throws OozieClientException {
+    public static void verifyNewBundleCreation(OozieClient oozieClient, String originalBundleId,
+                                               List<String> initialNominalTimes, String entity,
+                                               boolean shouldBeCreated, boolean matchInstances)
+        throws OozieClientException {
         String entityName = Util.readEntityName(entity);
         EntityType entityType = Util.getEntityType(entity);
-        String newBundleId = InstanceUtil.getLatestBundleID(cluster, entityName,
-            entityType);
+        String newBundleId = getLatestBundleID(oozieClient, entityName, entityType);
         if (shouldBeCreated) {
             Assert.assertTrue(!newBundleId.equalsIgnoreCase(originalBundleId),
                 "eeks! new bundle is not getting created!!!!");
-            LOGGER.info("old bundleId=" + originalBundleId + " on oozie: "
-                    +
-                "" + cluster.getProcessHelper().getOozieClient().getOozieUrl());
-            LOGGER.info("new bundleId=" + newBundleId + " on oozie: "
-                    +
-                "" + cluster.getProcessHelper().getOozieClient().getOozieUrl());
+            LOGGER.info("old bundleId=" + originalBundleId + " on oozie: " + oozieClient);
+            LOGGER.info("new bundleId=" + newBundleId + " on oozie: " + oozieClient);
             if (matchInstances) {
-                validateNumberOfWorkflowInstances(cluster,
+                validateNumberOfWorkflowInstances(oozieClient,
                         initialNominalTimes, originalBundleId, newBundleId, entityType);
             }
         } else {
-            Assert.assertEquals(newBundleId,
-                originalBundleId, "eeks! new bundle is getting created!!!!");
+            Assert.assertEquals(newBundleId, originalBundleId, "eeks! new bundle is getting created!!!!");
         }
     }
 
-    private static void validateNumberOfWorkflowInstances(ColoHelper cluster,
+    private static void validateNumberOfWorkflowInstances(OozieClient oozieClient,
                                                           List<String> initialNominalTimes,
                                                           String originalBundleId,
                                                           String newBundleId, EntityType type)
         throws OozieClientException {
-
-        List<String> nominalTimesOriginalAndNew = getActionsNominalTime(cluster,
-                originalBundleId, type);
-
-        nominalTimesOriginalAndNew.addAll(getActionsNominalTime(cluster,
-            newBundleId, type));
-
+        List<String> nominalTimesOriginalAndNew = getActionsNominalTime(oozieClient, originalBundleId, type);
+        nominalTimesOriginalAndNew.addAll(getActionsNominalTime(oozieClient, newBundleId, type));
         initialNominalTimes.removeAll(nominalTimesOriginalAndNew);
-
         if (initialNominalTimes.size() != 0) {
             LOGGER.info("Missing instance are : " + initialNominalTimes);
             LOGGER.debug("Original Bundle ID   : " + originalBundleId);
             LOGGER.debug("New Bundle ID        : " + newBundleId);
-
-            Assert.assertFalse(true, "some instances have gone missing after "
-                    +
-                "update");
+            Assert.fail("some instances have gone missing after update");
         }
     }
 
-    public static String getCoordStartTime(ColoHelper colo, String entity,
-                                           int bundleNo)
+    public static String getCoordStartTime(OozieClient oozieClient, String entity, int bundleNo)
         throws OozieClientException {
-        final OozieClient oozieClient = colo.getClusterHelper().getOozieClient();
-        String bundleID = InstanceUtil.getSequenceBundleID(oozieClient,
+        String bundleID = getSequenceBundleID(oozieClient,
             Util.readEntityName(entity), Util.getEntityType(entity), bundleNo);
-
-        CoordinatorJob coord = getDefaultOozieCoord(colo, bundleID,
+        CoordinatorJob coord = getDefaultOozieCoord(oozieClient, bundleID,
             Util.getEntityType(entity));
-
-        return TimeUtil.dateToOozieDate(coord.getStartTime()
-        );
+        return TimeUtil.dateToOozieDate(coord.getStartTime());
     }
 
     public static DateTimeFormatter getOozieDateTimeFormatter() {
@@ -475,11 +440,10 @@ public final class OozieUtil {
                                                  int instanceNumber)
         throws OozieClientException, IOException {
         final OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
-        String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName,
-            type, bundleNumber);
+        String bundleID = getSequenceBundleID(oozieClient, entityName, type, bundleNumber);
         List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
-        HadoopUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords,
-            instanceNumber));
+        HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(),
+            getMissingDependenciesForInstance(oozieClient, coords, instanceNumber));
     }
 
     private static List<String> getMissingDependenciesForInstance(OozieClient oozieClient,
@@ -487,7 +451,6 @@ public final class OozieUtil {
         throws OozieClientException {
         ArrayList<String> missingPaths = new ArrayList<String>();
         for (CoordinatorJob coord : coords) {
-
             CoordinatorJob temp = oozieClient.getCoordJobInfo(coord.getId());
             CoordinatorAction instance = temp.getActions().get(instanceNumber);
             missingPaths.addAll(Arrays.asList(instance.getMissingDependencies().split("#")));
@@ -499,10 +462,8 @@ public final class OozieUtil {
                                                  String entityName, int bundleNumber)
         throws OozieClientException, IOException {
         final OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
-        String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName, type,
-            bundleNumber);
-        List<List<String>> missingDependencies = createMissingDependenciesForBundle(helper, bundleID);
-        return missingDependencies;
+        String bundleID = getSequenceBundleID(oozieClient, entityName, type, bundleNumber);
+        return createMissingDependenciesForBundle(helper, bundleID);
     }
 
     public static List<List<String>> createMissingDependenciesForBundle(ColoHelper helper, String bundleId)
@@ -511,7 +472,8 @@ public final class OozieUtil {
         List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleId).getCoordinators();
         List<List<String>> missingDependencies = getMissingDependenciesForBundle(oozieClient, coords);
         for (List<String> missingDependencyPerInstance : missingDependencies) {
-            HadoopUtil.createHDFSFolders(helper, missingDependencyPerInstance);
+            HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(),
+                missingDependencyPerInstance);
         }
         return missingDependencies;
     }
@@ -531,12 +493,216 @@ public final class OozieUtil {
         return missingDependencies;
     }
 
-    public static void validateRetryAttempts(ColoHelper helper, String bundleId, EntityType type,
+    public static void validateRetryAttempts(OozieClient oozieClient, String bundleId, EntityType type,
                                              int attempts) throws OozieClientException {
-        OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
-        CoordinatorJob coord = getDefaultOozieCoord(helper, bundleId, type);
+        CoordinatorJob coord = getDefaultOozieCoord(oozieClient, bundleId, type);
         int actualRun = oozieClient.getJobInfo(coord.getActions().get(0).getExternalId()).getRun();
         LOGGER.info("Actual run count: " + actualRun); // wrt 0
         Assert.assertEquals(actualRun, attempts, "Rerun attempts did not match");
     }
+
+    public static int checkIfFeedCoordExist(OozieClient oozieClient,
+            String feedName, String coordType) throws OozieClientException {
+        LOGGER.info("feedName: " + feedName);
+        int numberOfCoord = 0;
+        if (getBundles(oozieClient, feedName, EntityType.FEED).size() == 0) {
+            return 0;
+        }
+        List<String> bundleIds = getBundles(oozieClient, feedName, EntityType.FEED);
+        LOGGER.info("bundleIds: " + bundleIds);
+
+        for (String aBundleId : bundleIds) {
+            LOGGER.info("aBundleId: " + aBundleId);
+            waitForCoordinatorJobCreation(oozieClient, aBundleId);
+            List<CoordinatorJob> coords =
+                    getBundleCoordinators(oozieClient, aBundleId);
+            LOGGER.info("coords: " + coords);
+            for (CoordinatorJob coord : coords) {
+                if (coord.getAppName().contains(coordType)) {
+                    numberOfCoord++;
+                }
+            }
+        }
+        return numberOfCoord;
+    }
+
+    /**
+     * Retrieves replication coordinatorID from bundle of coordinators.
+     */
+    public static List<String> getReplicationCoordID(String bundleId, AbstractEntityHelper helper)
+        throws OozieClientException {
+        final OozieClient oozieClient = helper.getOozieClient();
+        List<CoordinatorJob> coords = getBundleCoordinators(oozieClient, bundleId);
+        List<String> replicationCoordID = new ArrayList<String>();
+        for (CoordinatorJob coord : coords) {
+            if (coord.getAppName().contains("FEED_REPLICATION")) {
+                replicationCoordID.add(coord.getId());
+            }
+        }
+        return replicationCoordID;
+    }
+
+    /**
+     * Retrieves ID of bundle related to some process/feed using its ordinal number.
+     *
+     * @param entityName   - name of entity bundle is related to
+     * @param entityType   - feed or process
+     * @param bundleNumber - ordinal number of bundle
+     * @return bundle ID
+     * @throws org.apache.oozie.client.OozieClientException
+     */
+    public static String getSequenceBundleID(OozieClient oozieClient, String entityName,
+            EntityType entityType, int bundleNumber) throws OozieClientException {
+        //sequence start from 0
+        List<String> bundleIds = getBundles(oozieClient,
+                entityName, entityType);
+        Map<Integer, String> bundleMap = new TreeMap<Integer, String>();
+        String bundleID;
+        for (String strID : bundleIds) {
+            LOGGER.info("getSequenceBundleID: " + strID);
+            int key = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+            bundleMap.put(key, strID);
+        }
+        for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
+            LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue());
+        }
+        int i = 0;
+        for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
+            bundleID = entry.getValue();
+            if (i == bundleNumber) {
+                return bundleID;
+            }
+            i++;
+        }
+        return null;
+    }
+
+    /**
+     * Retrieves the latest bundle ID.
+     *
+     * @param oozieClient where job is running
+     * @param entityName  name of entity job is related to
+     * @param entityType  type of entity - feed or process expected
+     * @return latest bundle ID
+     * @throws org.apache.oozie.client.OozieClientException
+     */
+    public static String getLatestBundleID(OozieClient oozieClient,
+            String entityName, EntityType entityType) throws OozieClientException {
+        List<String> bundleIds = getBundles(oozieClient, entityName, entityType);
+        String max = "0";
+        int maxID = -1;
+        for (String strID : bundleIds) {
+            if (maxID < Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
+                maxID = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+                max = strID;
+            }
+        }
+        return max;
+    }
+
+    /**
+     * Retrieves all coordinators of bundle.
+     *
+     * @param oozieClient Oozie client to use for fetching info.
+     * @param bundleID specific bundle ID
+     * @return list of bundle coordinators
+     * @throws org.apache.oozie.client.OozieClientException
+     */
+    public static List<CoordinatorJob> getBundleCoordinators(OozieClient oozieClient, String bundleID)
+        throws OozieClientException {
+        BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleID);
+        return bundleInfo.getCoordinators();
+    }
+
+    public static Job.Status getDefaultCoordinatorStatus(OozieClient oozieClient, String processName,
+            int bundleNumber) throws OozieClientException {
+        String bundleID = getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
+        String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID);
+        return oozieClient.getCoordJobInfo(coordId).getStatus();
+    }
+
+    public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId)
+        throws OozieClientException {
+        waitForCoordinatorJobCreation(oozieClient, bundleId);
+        BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId);
+        List<CoordinatorJob> coords = bundleInfo.getCoordinators();
+        int min = 100000;
+        String minString = "";
+        for (CoordinatorJob coord : coords) {
+            String strID = coord.getId();
+            if (min > Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
+                min = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+                minString = coord.getId();
+            }
+        }
+        LOGGER.info("function getDefaultCoordIDFromBundle: minString: " + minString);
+        return minString;
+    }
+
+    public static String getLatestCoordinatorID(OozieClient oozieClient, String processName,
+            EntityType entityType) throws OozieClientException {
+        final String latestBundleID = getLatestBundleID(oozieClient, processName, entityType);
+        return getDefaultCoordIDFromBundle(oozieClient, latestBundleID);
+    }
+
+    /**
+     * Waits till bundle job will reach expected status.
+     * Generates time according to expected status.
+     *
+     * @param oozieClient    oozieClient of cluster job is running on
+     * @param processName    name of process which job is being analyzed
+     * @param expectedStatus job status we are waiting for
+     * @throws org.apache.oozie.client.OozieClientException
+     */
+    public static void waitForBundleToReachState(OozieClient oozieClient,
+            String processName, Job.Status expectedStatus) throws OozieClientException {
+        int totalMinutesToWait = getMinutesToWait(expectedStatus);
+        waitForBundleToReachState(oozieClient, processName, expectedStatus, totalMinutesToWait);
+    }
+
+    /**
+     * Waits till bundle job will reach expected status during specific time.
+     * Use it directly in test cases when timeouts are different from trivial, in other cases use
+     * waitForBundleToReachState(OozieClient, String, Status)
+     *
+     * @param oozieClient        oozie client of cluster job is running on
+     * @param processName        name of process which job is being analyzed
+     * @param expectedStatus     job status we are waiting for
+     * @param totalMinutesToWait specific time to wait expected state
+     * @throws org.apache.oozie.client.OozieClientException
+     */
+    public static void waitForBundleToReachState(OozieClient oozieClient, String processName,
+            Job.Status expectedStatus, int totalMinutesToWait) throws OozieClientException {
+        int sleep = totalMinutesToWait * 60 / 20;
+        for (int sleepCount = 0; sleepCount < sleep; sleepCount++) {
+            String bundleID =
+                    getLatestBundleID(oozieClient, processName, EntityType.PROCESS);
+            BundleJob j = oozieClient.getBundleJobInfo(bundleID);
+            LOGGER.info(sleepCount + ". Current status: " + j.getStatus()
+                + "; expected: " + expectedStatus);
+            if (j.getStatus() == expectedStatus) {
+                return;
+            }
+            TimeUtil.sleepSeconds(20);
+        }
+        Assert.fail("State " + expectedStatus + " wasn't reached in " + totalMinutesToWait + " mins");
+    }
+
+    /**
+     * Generates time which is presumably needed for bundle job to reach particular state.
+     *
+     * @param expectedStatus status which we are expect to get from bundle job
+     * @return minutes to wait for expected status
+     */
+    private static int getMinutesToWait(Job.Status expectedStatus) {
+        switch (expectedStatus) {
+        case DONEWITHERROR:
+        case SUCCEEDED:
+            return OSUtil.IS_WINDOWS ? 40 : 20;
+        case KILLED:
+            return OSUtil.IS_WINDOWS ? 30 : 15;
+        default:
+            return OSUtil.IS_WINDOWS ? 60 : 30;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
index 02280f3..24af21f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
@@ -590,14 +590,13 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
 
         //get old process details
-        String oldProcessBundleId = InstanceUtil
-            .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        String oldProcessUser =
-            getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessBundleId = OozieUtil
+            .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
 
         //get old feed details
-        String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED);
-        String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
+        String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED);
+        String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED);
 
         //update feed definition
         FeedMerlin newFeed = new FeedMerlin(feed);
@@ -609,15 +608,15 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.assertSucceeded(serviceResponse);
 
         //new feed bundle should be created by U1
-        OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false);
-        String newFeedUser =
-                getBundleUser(cluster, newFeed.getName(), EntityType.FEED);
+        OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false);
+        String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED);
         Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same");
 
         //new process bundle should be created by U2
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+        OozieUtil.verifyNewBundleCreation(
+            clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
         String newProcessUser =
-            getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+            getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
     }
 
@@ -645,14 +644,13 @@ public class AuthorizationTest extends BaseTestClass {
         newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
 
         //get old process details
-        String oldProcessBundleId = InstanceUtil
-                .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        String oldProcessUser =
-                getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessBundleId = OozieUtil
+                .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
 
         //get old feed details
-        String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED);
-        String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
+        String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED);
+        String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED);
 
         //update feed by U2
         serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
@@ -660,15 +658,15 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.assertSucceeded(serviceResponse);
 
         //new feed bundle should be created by U2
-        OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false);
-        String newFeedUser = getBundleUser(cluster, newFeed.getName(), EntityType.FEED);
+        OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false);
+        String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED);
         Assert.assertNotEquals(oldFeedUser, newFeedUser, "User should not be the same");
         Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser);
 
         //new process bundle should be created by U2
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
-        String newProcessUser =
-                getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        OozieUtil.verifyNewBundleCreation(
+            clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+        String newProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
     }
 
@@ -691,14 +689,12 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
 
         //get old process details
-        String oldProcessBundleId = InstanceUtil
-                .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        String oldProcessUser =
-                getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessBundleId = OozieUtil
+                .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
 
         //get old feed details
-        String oldFeedBundleId = InstanceUtil
-                .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
+        String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED);
 
         //update process by U1
         ProcessMerlin processObj = bundles[0].getProcessObject();
@@ -707,12 +703,12 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.assertSucceeded(serviceResponse);
 
         //new feed bundle should not be created
-        OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
+        OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false);
 
         //new process bundle should be created by U1
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
-        String newProcessUser =
-                getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
+        OozieUtil.verifyNewBundleCreation(
+            clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+        String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS);
         Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
     }
 
@@ -735,14 +731,12 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
 
         //get old process details
-        String oldProcessBundleId = InstanceUtil
-                .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
-        String oldProcessUser =
-                getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessBundleId = OozieUtil
+                .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+        String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
 
         //get old feed details
-        String oldFeedBundleId = InstanceUtil
-                .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
+        String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED);
 
         //update process by U2
         ProcessMerlin processObj = bundles[0].getProcessObject();
@@ -752,22 +746,20 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.assertSucceeded(serviceResponse);
 
         //new feed bundle should not be created
-        OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
+        OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false);
 
         //new process bundle should be created by U2
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
-        String newProcessUser =
-                getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
+        OozieUtil.verifyNewBundleCreation(
+            clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+        String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS);
         Assert.assertNotEquals(oldProcessUser, newProcessUser, "User should not be the same");
         Assert.assertEquals(MerlinConstants.USER2_NAME, newProcessUser);
     }
 
-    private String getBundleUser(ColoHelper coloHelper, String entityName, EntityType entityType)
+    private String getBundleUser(OozieClient oozieClient, String entityName, EntityType entityType)
         throws OozieClientException {
-        String newBundleId = InstanceUtil.getLatestBundleID(coloHelper, entityName,
-            entityType);
-        BundleJob newBundleJob =
-            coloHelper.getClusterHelper().getOozieClient().getBundleJobInfo(newBundleId);
+        String newBundleId = OozieUtil.getLatestBundleID(oozieClient, entityName, entityType);
+        BundleJob newBundleJob = oozieClient.getBundleJobInfo(newBundleId);
         CoordinatorJob coordinatorJob = null;
         for (CoordinatorJob coord : newBundleJob.getCoordinators()) {
             if ((entityType == EntityType.PROCESS && coord.getAppName().contains("DEFAULT"))

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
index 33b0e77..8c5d330 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
@@ -24,7 +24,6 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.*;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.Job;
@@ -44,7 +43,6 @@ import java.util.List;
 public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
 
     private ColoHelper cluster = servers.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
     private OozieClient clusterOC = serverOC.get(0);
     private String baseTestDir = cleanAndGetTestDir();
     private String aggregateWorkflowDir = baseTestDir + "/aggregator";
@@ -97,13 +95,11 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
     public void currentAndLastWeekTest(String startInstance, String endInstance,
             String firstDep, String endDep) throws Exception {
         bundles[0].setDatasetInstances(startInstance, endInstance);
-
         bundles[0].submitFeedsScheduleProcess(prism);
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
 
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-
-        List<String> missingDependencies = getMissingDependencies(cluster, bundles[0]);
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+        List<String> missingDependencies = getMissingDependencies(clusterOC, bundles[0].getProcessName());
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, bundles[0].getProcessName(), 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
                 CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -146,15 +142,14 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
         return true;
     }
 
-    private List<String> getMissingDependencies(ColoHelper prismHelper, Bundle bundle) throws OozieClientException {
-        List<String> bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
-                bundle.getProcessName(), EntityType.PROCESS);
+    public List<String> getMissingDependencies(OozieClient oozieClient,
+                                              String processName) throws OozieClientException {
+        List<String> bundles = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS);
         String coordID = bundles.get(0);
-        List<String> missingDependencies =
-                OozieUtil.getMissingDependencies(prismHelper, coordID);
+        List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
         for (int i = 0; i < 10 && missingDependencies == null; ++i) {
             TimeUtil.sleepSeconds(30);
-            missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID);
+            missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
         }
         Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
         return missingDependencies;

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
index 07292e1..37f1149 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
@@ -29,6 +29,7 @@ import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.log4j.Logger;
+import org.apache.oozie.client.OozieClient;
 import org.testng.Assert;
 import org.testng.TestNGException;
 import org.testng.annotations.DataProvider;
@@ -139,7 +140,7 @@ public class ELValidationsTest extends BaseTestClass {
             LOGGER.info("processData in try is: " + Util.prettyPrintXml(bundle.getProcessData()));
             TimeUtil.sleepSeconds(45);
             if (isMatch) {
-                getAndMatchDependencies(cluster, bundle);
+                getAndMatchDependencies(serverOC.get(0), bundle);
             }
             return submitResponse;
         } catch (Exception e) {
@@ -151,12 +152,11 @@ public class ELValidationsTest extends BaseTestClass {
         }
     }
 
-    private void getAndMatchDependencies(ColoHelper prismHelper, Bundle bundle) {
+    private void getAndMatchDependencies(OozieClient oozieClient, Bundle bundle) {
         try {
             List<String> bundles = null;
             for (int i = 0; i < 10; ++i) {
-                bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
-                    bundle.getProcessName(), EntityType.PROCESS);
+                bundles = OozieUtil.getBundles(oozieClient, bundle.getProcessName(), EntityType.PROCESS);
                 if (bundles.size() > 0) {
                     break;
                 }
@@ -165,17 +165,16 @@ public class ELValidationsTest extends BaseTestClass {
             Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
             String coordID = bundles.get(0);
             LOGGER.info("coord id: " + coordID);
-            List<String> missingDependencies =
-                OozieUtil.getMissingDependencies(prismHelper, coordID);
+            List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
             for (int i = 0; i < 10 && missingDependencies == null; ++i) {
                 TimeUtil.sleepSeconds(30);
-                missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID);
+                missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
             }
             Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
             for (String dependency : missingDependencies) {
                 LOGGER.info("dependency from job: " + dependency);
             }
-            Date jobNominalTime = OozieUtil.getNominalTime(prismHelper, coordID);
+            Date jobNominalTime = OozieUtil.getNominalTime(oozieClient, coordID);
             Calendar time = Calendar.getInstance();
             time.setTime(jobNominalTime);
             LOGGER.info("nominalTime:" + jobNominalTime);
@@ -224,8 +223,7 @@ public class ELValidationsTest extends BaseTestClass {
     }
 
     private List<String> getQADepedencyList(Calendar nominalTime, Date startRef,
-                                            Date endRef, int frequency,
-                                            Bundle bundle) {
+                                            Date endRef, int frequency, Bundle bundle) {
         LOGGER.info("start ref:" + startRef);
         LOGGER.info("end ref:" + endRef);
         Calendar initialTime = Calendar.getInstance();

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
index 69be47a..4fb3c4a 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -163,7 +164,7 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
         InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
         int counter = OSUtil.IS_WINDOWS ? 100 : 50;
-        InstanceUtil.waitForBundleToReachState(cluster, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter);
+        OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter);
         r = prism.getProcessHelper().getRunningInstance(processName);
         InstanceUtil.validateSuccessWOInstances(r);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
index 8eff8e4..05a2b0b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.MatrixUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -130,7 +131,6 @@ public class ExternalFSTest extends BaseTestClass{
 
     }
 
-
     @Test(dataProvider = "getData")
     public void replicateToExternalFS(final FileSystem externalFS,
         final String separator, final boolean withData) throws Exception {
@@ -181,13 +181,10 @@ public class ExternalFSTest extends BaseTestClass{
         Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern);
 
         //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster, feed.toString(), 0);
-
-        Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed.toString()),
-                "REPLICATION"), 1);
-
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, feed.toString(), 0);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(clusterOC, feed.getName(), "REPLICATION"), 1);
         TimeUtil.sleepSeconds(10);
+
         //replication should start, wait while it ends
         InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);


Mime
View raw message