falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rag...@apache.org
Subject [22/41] git commit: FALCON-660 7 test classes refactored and few of them documented. Contributed by Paul Isaychuk
Date Thu, 11 Sep 2014 22:19:27 GMT
FALCON-660 7 test classes refactored and few of them documented. Contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3678eaba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3678eaba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3678eaba

Branch: refs/heads/FALCON-585
Commit: 3678eabab73603a7ca3e45e1a56187cc0aca3619
Parents: 75f06b4
Author: Ruslan Ostafiychuk <rostafiychuk@apache.org>
Authored: Wed Sep 3 18:20:38 2014 +0300
Committer: Ruslan Ostafiychuk <rostafiychuk@apache.org>
Committed: Wed Sep 3 18:20:38 2014 +0300

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../falcon/regression/InstanceParamTest.java    |  45 ++--
 .../falcon/regression/InstanceSummaryTest.java  | 171 +++++--------
 .../regression/prism/EntityDryRunTest.java      |  26 +-
 .../regression/prism/PrismFeedSnSTest.java      | 252 ++++++++++---------
 .../prism/PrismProcessScheduleTest.java         | 111 +++-----
 .../regression/prism/PrismProcessSnSTest.java   | 151 ++++++-----
 .../falcon/regression/ui/ProcessUITest.java     |  21 +-
 8 files changed, 361 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 94403c8..b60c23c 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -9,6 +9,9 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+   FALCON-660 7 test classes refactored and few of them documented (Paul Isaychuk via
+   Ruslan Ostafiychuk)
+
    FALCON-653 Add falcon regression test for zero input process(Karishma via Samarth Gupta)
    FALCON-655 Skip workflow upload if process won't be submitted (Ruslan Ostafiychuk)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
index d733cfc..edf3428 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
@@ -58,24 +58,21 @@ public class InstanceParamTest extends BaseTestClass {
      */
 
     private String baseTestHDFSDir = baseHDFSDir + "/InstanceParamTest";
-    private String feedInputPath = baseTestHDFSDir
-            +
+    private String feedInputPath = baseTestHDFSDir +
         "/testInputData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
     private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
     private String startTime;
     private String endTime;
-
     private ColoHelper cluster1 = servers.get(0);
-    private OozieClient oC1 = serverOC.get(0);
+    private OozieClient cluster1OC = serverOC.get(0);
     private Bundle processBundle;
     private static final Logger LOGGER = Logger.getLogger(InstanceParamTest.class);
-
+    private String processName;
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
         uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-        startTime = TimeUtil.get20roundedTime(TimeUtil
-            .getTimeWrtSystemTime(-20));
+        startTime = TimeUtil.get20roundedTime(TimeUtil.getTimeWrtSystemTime(-20));
         endTime = TimeUtil.getTimeWrtSystemTime(60);
     }
 
@@ -92,7 +89,12 @@ public class InstanceParamTest extends BaseTestClass {
             bundles[i].generateUniqueBundle();
             bundles[i].setProcessWorkflow(aggregateWorkflowDir);
         }
+        processName = processBundle.getProcessName();
     }
+
+    /**
+     * Schedule process. Get params of waiting instance.
+     */
     @Test(timeOut = 1200000, enabled = false)
     public void getParamsValidRequestInstanceWaiting()
         throws URISyntaxException, JAXBException, AuthenticationException, IOException,
@@ -104,12 +106,14 @@ public class InstanceParamTest extends BaseTestClass {
             ClusterType.SOURCE, null, null);
         processBundle.submitFeedsScheduleProcess(prism);
         InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
-        InstancesResult r = prism.getProcessHelper()
-            .getInstanceParams(Util.readEntityName(processBundle.getProcessData()),
-                "?start="+startTime);
+        InstancesResult r = prism.getProcessHelper().getInstanceParams(processName,
+            "?start=" + startTime);
         r.getMessage();
     }
 
+    /**
+     * Schedule process. Wait till instance succeeded. Get its params.
+     */
     @Test(timeOut = 1200000, enabled = true)
     public void getParamsValidRequestInstanceSucceeded()
         throws URISyntaxException, JAXBException, AuthenticationException, IOException,
@@ -121,16 +125,18 @@ public class InstanceParamTest extends BaseTestClass {
             ClusterType.SOURCE, null, null);
         processBundle.submitFeedsScheduleProcess(prism);
         InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
-            processBundle.getProcessName(), 0);
-        InstanceUtil.waitTillInstanceReachState(oC1, processBundle.getProcessName(), 1,
+        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, processName, 0);
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, processName, 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 10);
         InstancesResult r = prism.getProcessHelper()
-            .getInstanceParams(Util.readEntityName(processBundle.getProcessData()),
-                "?start="+startTime);
+            .getInstanceParams(processName, "?start=" + startTime);
         LOGGER.info(r.getMessage());
     }
 
+    /**
+     *  Schedule process. Wait till instance got killed. Get its params.
+     *  TODO: change according to test case
+     */
     @Test(timeOut = 1200000, enabled = false)
     public void getParamsValidRequestInstanceKilled()
         throws URISyntaxException, JAXBException, AuthenticationException, IOException,
@@ -142,15 +148,12 @@ public class InstanceParamTest extends BaseTestClass {
             ClusterType.SOURCE, null, null);
         processBundle.submitFeedsScheduleProcess(prism);
         InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
-            processBundle.getProcessName(), 0);
-        InstanceUtil.waitTillInstanceReachState(oC1, processBundle.getProcessName(), 0,
+        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, processName, 0);
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, processName, 0,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
         InstancesResult r = prism.getProcessHelper()
-            .getInstanceParams(Util.readEntityName(processBundle.getProcessData()),
-                "?start="+startTime);
+            .getInstanceParams(processName, "?start=" + startTime);
         r.getMessage();
-
     }
 
     @AfterMethod(alwaysRun = true)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
index 154485f..636da2c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
@@ -50,43 +50,34 @@ import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.util.List;
 
-/*
-this test currently provide minimum verification. More detailed test should
- be added
+/** This test currently provide minimum verification. More detailed test should be added:
+    1. process : test summary single cluster few instance some future some past
+    2. process : test multiple cluster, full past on one cluster,  full future on one cluster,
+    half future / past on third one
+    3. feed : same as test 1 for feed
+    4. feed : same as test 2 for feed
  */
 @Test(groups = "embedded")
 public class InstanceSummaryTest extends BaseTestClass {
 
-    //1. process : test summary single cluster few instance some future some past
-    //2. process : test multiple cluster, full past on one cluster,
-    // full future on one cluster, half future / past on third one
-
-    // 3. feed : same as test 1 for feed
-    // 4. feed : same as test 2 for feed
-
-
     String baseTestHDFSDir = baseHDFSDir + "/InstanceSummaryTest";
     String feedInputPath = baseTestHDFSDir +
         "/testInputData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
     String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
     String startTime;
     String endTime;
-
     ColoHelper cluster3 = servers.get(2);
-
     Bundle processBundle;
     private static final Logger logger = Logger.getLogger(InstanceSummaryTest.class);
+    String processName;
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
         uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-        startTime = TimeUtil.get20roundedTime(TimeUtil
-            .getTimeWrtSystemTime
-                (-20));
+        startTime = TimeUtil.get20roundedTime(TimeUtil.getTimeWrtSystemTime(-20));
         endTime = TimeUtil.getTimeWrtSystemTime(60);
         String startTimeData = TimeUtil.addMinsToTime(startTime, -100);
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeData, endTime, 20);
-
         for (FileSystem fs : serverFS) {
             HadoopUtil.deleteDirIfExists(Util.getPathPrefix(feedInputPath), fs);
             HadoopUtil.flattenAndPutDataInFolder(fs, OSUtil.NORMAL_INPUT,
@@ -108,100 +99,88 @@ public class InstanceSummaryTest extends BaseTestClass {
             bundles[i].generateUniqueBundle();
             bundles[i].setProcessWorkflow(aggregateWorkflowDir);
         }
+        processName = Util.readEntityName(processBundle.getProcessData());
     }
 
+    /**
+     *  Schedule single-cluster process. Get its instances summary.
+     *  TODO: should be complete
+     */
     @Test(enabled = true, timeOut = 1200000)
     public void testSummarySingleClusterProcess()
         throws URISyntaxException, JAXBException, IOException, ParseException,
         OozieClientException, AuthenticationException {
         processBundle.setProcessValidity(startTime, endTime);
         processBundle.submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster3,
-            processBundle.getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 0);
 
         // start only at start time
         InstancesSummaryResult r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime);
-
-        InstanceUtil.waitTillInstanceReachState(serverOC.get(2),
-            Util.readEntityName(processBundle.getProcessData()), 2,
+            .getInstanceSummary(processName, "?start=" + startTime);
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(2), processName, 2,
             Status.SUCCEEDED, EntityType.PROCESS);
 
-
         //AssertUtil.assertSucceeded(r);
 
         //start only before process start
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
+        r = prism.getProcessHelper().getInstanceSummary(processName,
                 "?start=" + TimeUtil.addMinsToTime(startTime, -100));
         //AssertUtil.assertFailed(r,"response should have failed");
 
         //start only after process end
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
+        r = prism.getProcessHelper().getInstanceSummary(processName,
                 "?start=" + TimeUtil.addMinsToTime(startTime, 120));
 
 
         //start only at mid specific instance
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + TimeUtil.addMinsToTime(startTime,
-                    +10));
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+                "?start=" + TimeUtil.addMinsToTime(startTime, 10));
 
         //start only in between 2 instance
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + TimeUtil.addMinsToTime(startTime,
-                    7));
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+                "?start=" + TimeUtil.addMinsToTime(startTime, 7));
 
         //start and end at start and end
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + startTime + "&end=" + endTime);
 
         //start in between and end at end
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + TimeUtil.addMinsToTime(startTime,
-                    14) + "&end=" + endTime);
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 14) + "&end=" + endTime);
 
         //start at start and end between
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime,
-                    -20));
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime, -20));
 
         // start and end in between
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + TimeUtil.addMinsToTime(startTime,
-                    20) + "&end=" + TimeUtil.addMinsToTime(endTime, -13));
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+                "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+                    + "&end=" + TimeUtil.addMinsToTime(endTime, -13));
 
         //start before start with end in between
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + TimeUtil.addMinsToTime(startTime,
-                    -100) + "&end=" + TimeUtil.addMinsToTime(endTime, -37));
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+                "?start=" + TimeUtil.addMinsToTime(startTime, -100)
+                    + "&end=" + TimeUtil.addMinsToTime(endTime, -37));
 
         //start in between and end after end
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + TimeUtil.addMinsToTime(startTime,
-                    60) + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+                "?start=" + TimeUtil.addMinsToTime(startTime, 60)
+                    + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
 
         // both start end out od range
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + TimeUtil.addMinsToTime(startTime,
-                    -100) + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + TimeUtil.addMinsToTime(startTime,-100)
+                + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
 
         // end only
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
+        r = prism.getProcessHelper().getInstanceSummary(processName,
                 "?end=" + TimeUtil.addMinsToTime(endTime, -30));
     }
 
+    /**
+     * Adjust multi-cluster process. Submit and schedule it. Get its instances summary.
+     * TODO: should be complete
+     */
     @Test(enabled = true, timeOut = 1200000)
     public void testSummaryMultiClusterProcess() throws JAXBException,
         ParseException, IOException, URISyntaxException, AuthenticationException {
@@ -212,39 +191,31 @@ public class InstanceSummaryTest extends BaseTestClass {
             ClusterType.SOURCE, null, null);
         processBundle.submitFeedsScheduleProcess(prism);
         InstancesSummaryResult r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime);
-
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
+            .getInstanceSummary(processName, "?start=" + startTime);
 
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+             "?start=" + startTime + "&end=" + endTime);
 
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+             "?start=" + startTime + "&end=" + endTime);
 
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + startTime + "&end=" + endTime);
 
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + startTime + "&end=" + endTime);
 
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + startTime + "&end=" + endTime);
 
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
-
-
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
-
-
-        r = prism.getProcessHelper()
-            .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
+        r = prism.getProcessHelper().getInstanceSummary(processName,
+            "?start=" + startTime + "&end=" + endTime);
     }
 
+    /**
+     *  Adjust multi-cluster feed. Submit and schedule it. Get its instances summary.
+     *  TODO: should be complete
+     */
     @Test(enabled = true, timeOut = 1200000)
     public void testSummaryMultiClusterFeed() throws JAXBException, ParseException, IOException,
         URISyntaxException, OozieClientException, AuthenticationException {
@@ -253,7 +224,6 @@ public class InstanceSummaryTest extends BaseTestClass {
         String feed = bundles[0].getDataSets().get(0);
 
         //cluster_1 is target, cluster_2 is source and cluster_3 is neutral
-
         feed = InstanceUtil.setFeedCluster(feed,
             XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
             XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
@@ -268,8 +238,7 @@ public class InstanceSummaryTest extends BaseTestClass {
             .setFeedCluster(feed, XmlUtil.createValidity(startTime, "2099-10-01T12:25Z"),
                 XmlUtil.createRtention("days(100000)", ActionType.DELETE),
                 Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
-                null,
-                feedInputPath);
+                null, feedInputPath);
 
         feed = InstanceUtil
             .setFeedCluster(feed, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
@@ -287,21 +256,15 @@ public class InstanceSummaryTest extends BaseTestClass {
         feedInputPath, 1);*/
 
         //submit and schedule feed
-        prism.getFeedHelper().submitAndSchedule(Util.URLS
-            .SUBMIT_AND_SCHEDULE_URL, feed);
+        prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
 
         InstancesSummaryResult r = prism.getFeedHelper()
-            .getInstanceSummary(Util.readEntityName(feed),
-                "?start=" + startTime);
-
-        r = prism.getFeedHelper()
-            .getInstanceSummary(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime,
-                    -20));
+            .getInstanceSummary(Util.readEntityName(feed), "?start=" + startTime);
 
+        r = prism.getFeedHelper().getInstanceSummary(Util.readEntityName(feed),
+            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime, -20));
     }
 
-
     @AfterMethod(alwaysRun = true)
     public void tearDown() throws IOException {
         processBundle.deleteBundle(prism);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
index 5000746..0b06823 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
@@ -52,9 +52,9 @@ public class EntityDryRunTest extends BaseTestClass {
     private FileSystem clusterFS = serverFS.get(0);
     private String baseTestHDFSDir = baseHDFSDir + "/EntityDryRunTest";
     private String feedInputPath = baseTestHDFSDir +
-            "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+        "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
     private String feedOutputPath =
-            baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
     private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
     private static final Logger LOGGER = Logger.getLogger(EntityDryRunTest.class);
 
@@ -85,7 +85,6 @@ public class EntityDryRunTest extends BaseTestClass {
     }
 
     /**
-     *
      * tries to submit process with invalid el exp
      */
     @Test(groups = {"singleCluster"})
@@ -93,12 +92,11 @@ public class EntityDryRunTest extends BaseTestClass {
         bundles[0].setProcessProperty("EntityDryRunTestProp", "${coord:someEL(1)");
         bundles[0].submitProcess(true);
         ServiceResponse response = prism.getProcessHelper()
-                .schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
+            .schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
         validate(response);
     }
 
     /**
-     *
      * tries to update process with invalid EL exp
      */
     @Test(groups = {"singleCluster"})
@@ -108,17 +106,15 @@ public class EntityDryRunTest extends BaseTestClass {
         bundles[0].submitAndScheduleProcess();
         bundles[0].setProcessProperty("EntityDryRunTestProp", "${coord:someEL(1)");
         ServiceResponse response = prism.getProcessHelper().update(bundles[0].getProcessData(),
-                bundles[0].getProcessData(), TimeUtil.getTimeWrtSystemTime(5), null);
+            bundles[0].getProcessData(), TimeUtil.getTimeWrtSystemTime(5), null);
         validate(response);
         Assert.assertEquals(
             OozieUtil.getNumberOfBundle(cluster, EntityType.PROCESS, bundles[0].getProcessName()),
-            1,
-            "more than one bundle found after failed update request");
+            1, "more than one bundle found after failed update request");
     }
 
     /**
      * tries to submit feed with invalied EL exp
-     *
      */
     @Test(groups = {"singleCluster"})
     public void testDryRunFailureScheduleFeed() throws Exception {
@@ -131,14 +127,14 @@ public class EntityDryRunTest extends BaseTestClass {
     }
 
     /**
-     *
      * tries to update feed with invalid el exp
      */
     @Test(groups = {"singleCluster"})
     public void testDryRunFailureUpdateFeed() throws Exception {
         bundles[0].submitClusters(prism);
         String feed = bundles[0].getInputFeedFromBundle();
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+        ServiceResponse response =
+            prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertSucceeded(response);
         feed = Util.setFeedProperty(feed, "EntityDryRunTestProp", "${coord:someEL(1)");
         response = prism.getFeedHelper().update(feed, feed);
@@ -150,11 +146,9 @@ public class EntityDryRunTest extends BaseTestClass {
 
     private void validate(ServiceResponse response) throws JAXBException {
         AssertUtil.assertFailed(response);
-        Assert.assertTrue(response.getMessage()
-            .contains("org.apache.falcon.FalconException: AUTHENTICATION : E1004 :" +
-                " E1004: Expression language evaluation error, Unable to evaluate :${coord:someEL" +
-                "(1)"),
-            "Correct response was not present in process / feed schedule");
+        Assert.assertTrue(response.getMessage().contains("org.apache.falcon.FalconException: " +
+            "AUTHENTICATION : E1004 : Expression language evaluation error, Unable to evaluate " +
+            ":${coord:someEL(1)"), "Correct response was not present in process / feed schedule");
     }
 
     @AfterClass(alwaysRun = true)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
index b7da224..15a2c3c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
@@ -56,6 +56,7 @@ public class PrismFeedSnSTest extends BaseTestClass {
     private boolean restartRequired;
     String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSnSTest/aggregator";
     private static final Logger logger = Logger.getLogger(PrismFeedSnSTest.class);
+    String feed1, feed2;
 
     @BeforeClass(alwaysRun = true)
     public void uploadWorkflow() throws Exception {
@@ -72,6 +73,8 @@ public class PrismFeedSnSTest extends BaseTestClass {
             bundles[i].generateUniqueBundle();
             bundles[i].setProcessWorkflow(aggregateWorkflowDir);
         }
+        feed1 = bundles[0].getDataSets().get(0);
+        feed2 = bundles[1].getDataSets().get(0);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -82,22 +85,27 @@ public class PrismFeedSnSTest extends BaseTestClass {
         removeBundles();
     }
 
-
+    /**
+     *  Submit and schedule feed1 on cluster1 and check that only this feed is running there.
+     *  Perform the same for feed2 on cluster2.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testFeedSnSOnBothColos() throws Exception {
         //schedule both bundles
         bundles[0].submitAndScheduleFeed();
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-        bundles[1].submitAndScheduleFeed();
 
-        //now check if they have been scheduled correctly or not
+        bundles[1].submitAndScheduleFeed();
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
-
-        //check if there is no criss cross
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
     }
 
+    /**
+     *  Submit and schedule feed1 on cluster1 and feed2 on cluster2. Check that they are running
+     *  on matching clusters only. Submit and schedule them once more. Check that new bundles
+     *  were not created and feed still running on matching clusters.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testSnSAlreadyScheduledFeedOnBothColos() throws Exception {
         //schedule both bundles
@@ -112,102 +120,105 @@ public class PrismFeedSnSTest extends BaseTestClass {
         AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
 
-
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         //ensure only one bundle is there
         Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
-            Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+            Util.readEntityName(feed1), EntityType.FEED).size(), 1);
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
+        //ensure only one bundle is there
         Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
-            Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
+            Util.readEntityName(feed2), EntityType.FEED).size(), 1);
         //now check if they have been scheduled correctly or not
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
     }
 
-
+    /**
+     * Submit and schedule feed1 on cluster1, feed2 on cluster2. Suspend feed1 and check their
+     * statuses. Submit and schedule feed1 again. Check that statuses hasn't been changed and new
+     * bundle hasn't been created. Resume feed1. Repeat the same for feed2.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testSnSSuspendedFeedOnBothColos() throws Exception {
         //schedule both bundles
         bundles[0].submitAndScheduleFeed();
         bundles[1].submitAndScheduleFeed();
 
-        AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
-        //now check if they have been scheduled correctly or not
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
         Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
-            Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+            Util.readEntityName(feed1), EntityType.FEED).size(), 1);
 
-        AssertUtil.assertSucceeded(cluster1.getFeedHelper()
-            .resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper().resume(URLS.RESUME_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
 
-        AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed2));
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
 
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
         Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
-            Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
-        AssertUtil.assertSucceeded(cluster2.getFeedHelper()
-            .resume(URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+            Util.readEntityName(feed2), EntityType.FEED).size(), 1);
+        AssertUtil.assertSucceeded(cluster2.getFeedHelper().resume(URLS.RESUME_URL, feed2));
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
-
-
     }
 
+    /**
+     *  Submit and schedule both feeds. Delete them and submit and schedule again. Check that
+     *  action succeeded.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testSnSDeletedFeedOnBothColos() throws Exception {
         //schedule both bundles
         bundles[0].submitAndScheduleFeed();
         bundles[1].submitAndScheduleFeed();
 
-        AssertUtil.assertSucceeded(
-            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
 
-        AssertUtil.assertSucceeded(
-            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed2));
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
 
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
     }
 
+    /**
+     *  Attempt to submit and schedule non-registered feed should fail.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testScheduleNonExistentFeedOnBothColos() throws Exception {
         AssertUtil.assertFailed(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         AssertUtil.assertFailed(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
     }
 
+    /**
+     *  Shut down server on cluster1. Submit and schedule feed on cluster2. Check that only
+     *  mentioned feed is running there.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testFeedSnSOn1ColoWhileOtherColoIsDown() throws Exception {
         restartRequired = true;
-        for (String cluster : bundles[1].getClusters()) {
-            AssertUtil
-                .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
-        }
+        AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL,
+            bundles[1].getClusters().get(0)));
 
         Util.shutDownService(cluster1.getFeedHelper());
-
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
 
         //now check if they have been scheduled correctly or not
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -215,24 +226,29 @@ public class PrismFeedSnSTest extends BaseTestClass {
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
     }
 
-
+    /**
+     *  Attempt to submit and schedule feed on cluster which is down should fail and this feed
+     *  shouldn't run on another cluster.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testFeedSnSOn1ColoWhileThatColoIsDown() throws Exception {
         restartRequired = true;
         bundles[0].submitFeed();
-
         Util.shutDownService(cluster1.getFeedHelper());
-
         AssertUtil.assertFailed(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
     }
 
+    /**
+     *  Submit and schedule and then suspend feed1 on cluster1. Submit and schedule feed2 on
+     *  cluster2 and check that this actions don't affect each other.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeed() throws Exception {
         bundles[0].submitAndScheduleFeed();
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+            .suspend(URLS.SUSPEND_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
         bundles[1].submitAndScheduleFeed();
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -241,11 +257,15 @@ public class PrismFeedSnSTest extends BaseTestClass {
         AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
     }
 
+    /**
+     *  Submit and schedule and then delete feed1 on cluster1. Submit and schedule feed2 on
+     *  cluster2 and check that this actions don't affect each other.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeed() throws Exception {
         bundles[0].submitAndScheduleFeed();
         AssertUtil.assertSucceeded(
-            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+            prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
         bundles[1].submitAndScheduleFeed();
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -254,108 +274,110 @@ public class PrismFeedSnSTest extends BaseTestClass {
         AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
     }
 
+    /**
+     * Submit and schedule feed1 on cluster1 and check that it failed. Repeat for feed2.
+     *  TODO: should be reviewed
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testFeedSnSOnBothColosUsingColoHelper() throws Exception {
         //schedule both bundles
         bundles[0].submitFeed();
         APIResult result = Util.parseResponse((cluster1.getFeedHelper()
-            .submitEntity(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0))));
+            .submitEntity(URLS.SUBMIT_AND_SCHEDULE_URL, feed1)));
         Assert.assertEquals(result.getStatusCode(), 404);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
         bundles[1].submitFeed();
         result = Util.parseResponse(cluster2.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
         Assert.assertEquals(result.getStatusCode(), 404);
-
         AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
     }
 
-
+    /**
+     *  Submit and schedule both feeds. Suspend feed1 and submit and schedule it once more. Check
+     *  that status of feed1 is still suspended. Resume it. Suspend feed2 but submit and schedule
+     *  feed1 again. Check that it didn't affect feed2 and it is still suspended.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testSnSSuspendedFeedOnBothColosUsingColoHelper() throws Exception {
-
         //schedule both bundles
         bundles[0].submitFeed();
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         bundles[1].submitFeed();
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
 
-        AssertUtil.assertSucceeded(cluster1.getFeedHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
         //now check if they have been scheduled correctly or not
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
-        AssertUtil.assertSucceeded(
-            cluster1.getFeedHelper().resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper().resume(URLS.RESUME_URL, feed1));
 
-        AssertUtil.assertSucceeded(cluster2.getFeedHelper().suspend(URLS.SUSPEND_URL,
-            bundles[1].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(cluster2.getFeedHelper().suspend(URLS.SUSPEND_URL, feed2));
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
     }
 
-
+    /**
+     *  Submit and schedule both feeds and then delete them. Submit and schedule feeds again.
+     *  Check that action succeeded and feeds are running.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testScheduleDeletedFeedOnBothColosUsingColoHelper() throws Exception {
-
         //schedule both bundles
         bundles[0].submitAndScheduleFeed();
         bundles[1].submitAndScheduleFeed();
 
-        AssertUtil.assertSucceeded(
-            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
 
-        AssertUtil.assertSucceeded(
-            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed2));
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
-
-        Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
-                .getStatus(URLS.STATUS_URL, bundles[0].getDataSets().get(0))).getMessage(),
-            cluster1.getClusterHelper().getColoName() + "/RUNNING");
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
 
-        Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
-                .getStatus(URLS.STATUS_URL, bundles[1].getDataSets().get(0))).getMessage(),
-            cluster2.getClusterHelper().getColoName() + "/RUNNING");
+        Assert.assertEquals(Util.parseResponse(prism.getFeedHelper().getStatus(URLS.STATUS_URL,
+            feed1)).getMessage(), cluster1.getClusterHelper().getColoName() + "/RUNNING");
+        Assert.assertEquals(Util.parseResponse(prism.getFeedHelper().getStatus(URLS.STATUS_URL,
+            feed2)).getMessage(), cluster2.getClusterHelper().getColoName() + "/RUNNING");
     }
 
-
+    /**
+     *  Attempt to submit and schedule non-registered feeds should fail.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testSNSNonExistentFeedOnBothColosUsingColoHelper() throws Exception {
-
         Assert.assertEquals(Util.parseResponse(cluster1.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)))
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1))
             .getStatusCode(), 404);
         Assert.assertEquals(Util.parseResponse(cluster2.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)))
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2))
             .getStatusCode(), 404);
     }
 
+    /**
+     *  Shut down server on cluster1. Submit and schedule feed on cluster2. Check that only that
+     *  feed is running on cluster2.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testFeedSnSOn1ColoWhileOtherColoIsDownUsingColoHelper() throws Exception {
         restartRequired = true;
-        for (String cluster : bundles[1].getClusters()) {
-            AssertUtil
-                .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
-        }
+        AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL,
+            bundles[1].getClusters().get(0)));
 
         Util.shutDownService(cluster1.getFeedHelper());
-
         AssertUtil.assertSucceeded(prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
 
         //now check if they have been scheduled correctly or not
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -363,21 +385,24 @@ public class PrismFeedSnSTest extends BaseTestClass {
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
     }
 
-
+    /**
+     *  Set both clusters as feed clusters. Shut down one of them. Submit and schedule feed.
+     *  Check that action is partially successful.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testFeedSnSOn1ColoWhileThatColoIsDownUsingColoHelper() throws Exception {
         restartRequired = true;
+        String clust1 = bundles[0].getClusters().get(0);
+        String clust2 = bundles[1].getClusters().get(0);
 
         bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
-        logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
-
-        ServiceResponse r =
-            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        logger.info("cluster bundles[0]: " + Util.prettyPrintXml(clust1));
+        ServiceResponse r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, clust1);
         Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
 
         bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
-        logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
-        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        logger.info("cluster bundles[1]: " + Util.prettyPrintXml(clust2));
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, clust2);
         Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
 
         String startTimeUA1 = "2012-10-01T12:00Z";
@@ -392,53 +417,52 @@ public class PrismFeedSnSTest extends BaseTestClass {
         feed = InstanceUtil
             .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
                 XmlUtil.createRtention("days(10000)", ActionType.DELETE),
-                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
-                "${cluster.colo}",
+                Util.readEntityName(clust1), ClusterType.SOURCE, "${cluster.colo}",
                 baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
-
         feed = InstanceUtil
             .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
                 XmlUtil.createRtention("days(10000)", ActionType.DELETE),
-                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
-                baseHDFSDir +
+                Util.readEntityName(clust2), ClusterType.TARGET, null, baseHDFSDir +
                     "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
-
         logger.info("feed: " + Util.prettyPrintXml(feed));
 
         Util.shutDownService(cluster1.getFeedHelper());
 
-        ServiceResponse response = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
-        AssertUtil.assertPartial(response);
-        response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
-        AssertUtil.assertPartial(response);
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+        AssertUtil.assertPartial(r);
+        r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertPartial(r);
         Util.startService(cluster1.getFeedHelper());
-        prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
-        prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[1].getClusters().get(0));
-
+        prism.getClusterHelper().delete(URLS.DELETE_URL, clust1);
+        prism.getClusterHelper().delete(URLS.DELETE_URL, clust2);
     }
 
-
+    /**
+     *   Submit and schedule feed1 and suspend it. Submit and schedule feed2 on another cluster
+     *   and check that only feed2 is running on cluster2.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeedUsingColoHelper()
         throws Exception {
         bundles[0].submitAndScheduleFeed();
-        AssertUtil.assertSucceeded(
-            cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
 
         bundles[1].submitAndScheduleFeed();
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
-        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
-        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
     }
 
-
+    /**
+     *  Submit and schedule and delete feed1. Submit and schedule feed2 and check that this
+     *  action didn't affect feed1 and it is still killed.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeedUsingColoHelper() throws Exception {
         bundles[0].submitAndScheduleFeed();
-        AssertUtil.assertSucceeded(
-            prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
         AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
         bundles[1].submitAndScheduleFeed();
         AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
index 10df6c2..5590c54 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
@@ -56,6 +56,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
     String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessScheduleTest/aggregator";
     String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noinop";
     private static final Logger logger = Logger.getLogger(PrismProcessScheduleTest.class);
+    String process1;
+    String process2;
 
     @BeforeClass(alwaysRun = true)
     public void uploadWorkflow() throws Exception {
@@ -72,6 +74,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
             bundles[i].generateUniqueBundle();
             bundles[i].setProcessWorkflow(aggregateWorkflowDir);
         }
+        process1 = bundles[0].getProcessData();
+        process2 = bundles[1].getProcessData();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -99,7 +103,6 @@ public class PrismProcessScheduleTest extends BaseTestClass {
 
         //check if there is no criss cross
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
     }
 
     /**
@@ -123,9 +126,9 @@ public class PrismProcessScheduleTest extends BaseTestClass {
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
 
         AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+            .schedule(URLS.SCHEDULE_URL, process1));
         AssertUtil.assertSucceeded(cluster1.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+            .schedule(URLS.SCHEDULE_URL, process2));
 
         //now check if they have been scheduled correctly or not
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
@@ -148,33 +151,27 @@ public class PrismProcessScheduleTest extends BaseTestClass {
         bundles[1].submitAndScheduleProcess();
 
         //suspend process on colo-1
-        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper().suspend(URLS.SUSPEND_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
 
         //now check if it has been scheduled correctly or not
-        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
-        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .resume(URLS.RESUME_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper().resume(URLS.RESUME_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
 
         //suspend process on colo-2
-        AssertUtil.assertSucceeded(cluster1.getProcessHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(cluster1.getProcessHelper().suspend(URLS.SUSPEND_URL, process2));
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
 
         //now check if it has been scheduled correctly or not
-        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process2));
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
-        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper().resume(URLS.RESUME_URL, process2));
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-  }
+    }
 
     /**
      * Schedule two processes on different colos. Delete both of them. Try to schedule them once
@@ -188,21 +185,16 @@ public class PrismProcessScheduleTest extends BaseTestClass {
         bundles[0].submitAndScheduleProcess();
         bundles[1].submitAndScheduleProcess();
 
-        AssertUtil.assertSucceeded(
-            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
 
-        AssertUtil.assertSucceeded(
-            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process2));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
 
-        AssertUtil.assertFailed(cluster2.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
-        AssertUtil.assertFailed(cluster1.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
-
+        AssertUtil.assertFailed(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
+        AssertUtil.assertFailed(cluster1.getProcessHelper().schedule(URLS.SCHEDULE_URL, process2));
     }
 
     /**
@@ -212,11 +204,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
      */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testScheduleNonExistentProcessOnBothColos() throws Exception {
-        AssertUtil.assertFailed(cluster2.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
-        AssertUtil.assertFailed(cluster1.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
-
+        AssertUtil.assertFailed(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
+        AssertUtil.assertFailed(cluster1.getProcessHelper().schedule(URLS.SCHEDULE_URL, process2));
     }
 
     /**
@@ -230,11 +219,9 @@ public class PrismProcessScheduleTest extends BaseTestClass {
     public void testProcessScheduleOn1ColoWhileOtherColoIsDown() throws Exception {
         try {
             bundles[1].submitProcess(true);
-
             Util.shutDownService(cluster2.getProcessHelper());
-
             AssertUtil.assertSucceeded(prism.getProcessHelper()
-                .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+                .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2));
 
             //now check if they have been scheduled correctly or not
             AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -260,11 +247,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
     public void testProcessScheduleOn1ColoWhileThatColoIsDown() throws Exception {
         try {
             bundles[0].submitProcess(true);
-
             Util.shutDownService(cluster2.getProcessHelper());
-
-            AssertUtil.assertFailed(prism.getProcessHelper()
-                .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+            AssertUtil.assertFailed(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
             AssertUtil
                 .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         } catch (Exception e) {
@@ -273,7 +257,6 @@ public class PrismProcessScheduleTest extends BaseTestClass {
         } finally {
             Util.restartService(cluster2.getProcessHelper());
         }
-
     }
 
     /**
@@ -289,7 +272,7 @@ public class PrismProcessScheduleTest extends BaseTestClass {
         try {
             bundles[0].submitAndScheduleProcess();
             AssertUtil.assertSucceeded(cluster1.getProcessHelper()
-                .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+                .suspend(URLS.SUSPEND_URL, process1));
             AssertUtil
                 .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
 
@@ -301,12 +284,10 @@ public class PrismProcessScheduleTest extends BaseTestClass {
                 .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
             AssertUtil
                 .checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-
         } catch (Exception e) {
             e.printStackTrace();
             throw new TestNGException(e.getMessage());
         }
-
     }
 
     /**
@@ -321,8 +302,7 @@ public class PrismProcessScheduleTest extends BaseTestClass {
         throws Exception {
         try {
             bundles[0].submitAndScheduleProcess();
-            AssertUtil.assertSucceeded(prism.getProcessHelper()
-                .delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+            AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process1));
             AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
 
             bundles[1].submitAndScheduleProcess();
@@ -346,47 +326,30 @@ public class PrismProcessScheduleTest extends BaseTestClass {
      */
     @Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000)
     public void testRescheduleKilledProcess() throws Exception {
-
-    /*
-    add test data generator pending
-     */
-
+        /* add test data generator pending */
         bundles[0].setProcessValidity(TimeUtil.getTimeWrtSystemTime(-1),
-            TimeUtil.getTimeWrtSystemTime(1));
+               TimeUtil.getTimeWrtSystemTime(1));
         HadoopFileEditor hadoopFileEditor = null;
+        String process = bundles[0].getProcessData();
         try {
-
-            hadoopFileEditor = new HadoopFileEditor(cluster1
-                .getClusterHelper().getHadoopFS());
-
-            hadoopFileEditor.edit(new ProcessMerlin(bundles[0]
-                .getProcessData()).getWorkflow().getPath() + "/workflow.xml",
-                "<value>${outputData}</value>",
-                "<property>\n" +
-                    "                    <name>randomProp</name>\n" +
-                    "                    <value>randomValue</value>\n" +
-                    "                </property>");
-
+            hadoopFileEditor = new HadoopFileEditor(cluster1.getClusterHelper().getHadoopFS());
+            hadoopFileEditor.edit(new ProcessMerlin(process).getWorkflow().getPath() +
+                    "/workflow.xml", "<value>${outputData}</value>",
+                                        "<property>\n" +
+                       "                    <name>randomProp</name>\n" +
+                       "                    <value>randomValue</value>\n" +
+                       "                </property>");
             bundles[0].submitFeedsScheduleProcess(prism);
-
             InstanceUtil.waitForBundleToReachState(cluster1,
-                Util.readEntityName(bundles[0].getProcessData()),
-                org.apache.oozie.client.Job.Status.KILLED);
-
+                Util.readEntityName(process),Job.Status.KILLED);
             String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
-
-            prism.getProcessHelper().delete(URLS.DELETE_URL,
-                bundles[0].getProcessData());
+                Util.readEntityName(process), EntityType.PROCESS);
+            prism.getProcessHelper().delete(URLS.DELETE_URL, process);
 
             bundles[0].submitAndScheduleProcess();
-
             OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID,
-                new ArrayList<String>(),
-                bundles[0].getProcessData(), true,
-                false);
+                new ArrayList<String>(), process, true, false);
         } finally {
-
             if (hadoopFileEditor != null) {
                 hadoopFileEditor.restore();
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
index 842bc1a..304549d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
@@ -50,6 +50,8 @@ public class PrismProcessSnSTest extends BaseTestClass {
     OozieClient cluster2OC = serverOC.get(1);
     String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSnSTest/aggregator";
     private static final Logger logger = Logger.getLogger(PrismProcessSnSTest.class);
+    String process1;
+    String process2;
 
     @BeforeClass(alwaysRun = true)
     public void uploadWorkflow() throws Exception {
@@ -65,6 +67,8 @@ public class PrismProcessSnSTest extends BaseTestClass {
             bundles[i].generateUniqueBundle();
             bundles[i].setProcessWorkflow(aggregateWorkflowDir);
         }
+        process1 = bundles[0].getProcessData();
+        process2 = bundles[1].getProcessData();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -72,6 +76,11 @@ public class PrismProcessSnSTest extends BaseTestClass {
         removeBundles();
     }
 
+    /**
+     * Submit and schedule process1 on cluster1. Check that process2 is not running on cluster1.
+     * Submit and schedule process2 on cluster2. Check that process2 is running and process1 is
+     * not running on cluster2.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testProcessSnSOnBothColos() throws Exception {
         //schedule both bundles
@@ -79,49 +88,49 @@ public class PrismProcessSnSTest extends BaseTestClass {
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
         bundles[1].submitAndScheduleProcess();
-
-        //now check if they have been scheduled correctly or not
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
         //check if there is no criss cross
-        ServiceResponse response =
-            prism.getProcessHelper()
-                .getStatus(URLS.STATUS_URL, bundles[1].getProcessData());
+        ServiceResponse response = prism.getProcessHelper().getStatus(URLS.STATUS_URL, process2);
         logger.info(response.getMessage());
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
     }
 
+    /**
+     * Submit process1 on cluster1 and schedule it. Check that process1 runs on cluster1 but not
+     * on cluster2. Submit process2 but schedule process1 once more. Check that process1 is running
+     * on cluster1 but not on cluster2.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testProcessSnSForSubmittedProcessOnBothColos() throws Exception {
         //schedule both bundles
-
         bundles[0].submitProcess(true);
-
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
 
         bundles[1].submitProcess(true);
 
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
-        //now check if they have been scheduled correctly or not
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         //check if there is no criss cross
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
     }
 
+    /**
+     * Submit process1 on cluster1 and schedule it. Check that only process1 runs on cluster1.
+     * Submit process2 and check that it isn't running on cluster1. Submit and schedule process1
+     * once more and check that it is still running on cluster1 but process2 isn't running on
+     * cluster2.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testProcessSnSForSubmittedProcessOnBothColosUsingColoHelper()
         throws Exception {
-        //schedule both bundles
-
         bundles[0].submitProcess(true);
-
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
         bundles[1].submitProcess(true);
@@ -130,127 +139,109 @@ public class PrismProcessSnSTest extends BaseTestClass {
         bundles[1].submitProcess(true);
 
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
-        //now check if they have been scheduled correctly or not
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         //check if there is no criss cross
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
     }
 
+    /**
+     * Submit and schedule process1 on cluster1 and check that only it is running there. Submit
+     * and schedule process2 on cluster2 and check the same for it. Schedule process1 on cluster2.
+     * Check that it is running on cluster2 and cluster1 but process2 isn't running on cluster1.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testProcessSnSAlreadyScheduledOnBothColos() throws Exception {
         //schedule both bundles
         bundles[0].submitAndScheduleProcess();
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-        bundles[1].submitAndScheduleProcess();
 
-        //now check if they have been scheduled correctly or not
+        bundles[1].submitAndScheduleProcess();
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-        //check if there is no criss cross
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
 
         //reschedule trial
-
         AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+            .schedule(URLS.SCHEDULE_URL, process1));
         Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
-            Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+            Util.readEntityName(process1), EntityType.PROCESS).size(), 1);
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
 
+    /**
+     * Submit and schedule both process1 and process2. Suspend process1. Check their statuses.
+     * Submit and schedule process1 once more.
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testSnSSuspendedProcessOnBothColos() throws Exception {
         //schedule both bundles
         bundles[0].submitAndScheduleProcess();
         bundles[1].submitAndScheduleProcess();
-
-        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper().suspend(URLS.SUSPEND_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-        //now check if they have been scheduled correctly or not
+
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
-        Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
-            Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
-        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
-            .resume(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
+        Assert.assertEquals(OozieUtil.getBundles(cluster2OC, Util.readEntityName(process1),
+            EntityType.PROCESS).size(), 1);
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper().resume(URLS.SUSPEND_URL, process1));
 
-        AssertUtil.assertSucceeded(cluster1.getProcessHelper()
-            .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(cluster1.getProcessHelper().suspend(URLS.SUSPEND_URL, process2));
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2));
 
-        Assert.assertEquals(OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
-            Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS).size(), 1);
+        Assert.assertEquals(OozieUtil.getBundles(cluster1OC, Util.readEntityName(process2),
+            EntityType.PROCESS).size(), 1);
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
     }
 
+    /**
+     * Submit and schedule both processes on both cluster1 and cluster2. Check that they are
+     * running. Delete both of them. Submit and schedule them once more. Check that they are
+     * running again.
+     */
     @Test(groups = {"prism", "0.2", "embedded"})
     public void testSnSDeletedProcessOnBothColos() throws Exception {
         //schedule both bundles
         final String cluster1Running = cluster1.getClusterHelper().getColoName() + "/RUNNING";
         final String cluster2Running = cluster2.getClusterHelper().getColoName() + "/RUNNING";
-        bundles[0].submitAndScheduleProcess();
-
-        Assert.assertEquals(Util.parseResponse(
-                prism.getProcessHelper()
-                    .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())).getMessage(),
-            cluster1Running
-        );
 
+        bundles[0].submitAndScheduleProcess();
+        Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+            .getStatus(URLS.STATUS_URL, process1)).getMessage(), cluster1Running);
         bundles[1].submitAndScheduleProcess();
-        Assert.assertEquals(Util.parseResponse(
-                prism.getProcessHelper()
-                    .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())).getMessage(),
-            cluster2Running
-        );
+        Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+            .getStatus(URLS.STATUS_URL, process2)).getMessage(), cluster2Running);
 
-        AssertUtil.assertSucceeded(
-            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process1));
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-        AssertUtil.assertSucceeded(
-            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process2));
         AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
         AssertUtil.assertSucceeded(prism.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
-
-        Assert.assertEquals(Util.parseResponse(
-                prism.getProcessHelper()
-                    .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())
-            ).getMessage(),
-            cluster1Running
-        );
-        Assert.assertEquals(Util.parseResponse(
-                prism.getProcessHelper()
-                    .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())
-            ).getMessage(),
-            cluster2Running
-        );
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2));
 
+        Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+            .getStatus(URLS.STATUS_URL, process1)).getMessage(),cluster1Running);
+        Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+            .getStatus(URLS.STATUS_URL, process2)).getMessage(), cluster2Running);
     }
 
+    /**
+     * Attempt to submit and schedule processes when all required entities weren't registered
+     */
     @Test(groups = {"prism", "0.2", "distributed"})
     public void testScheduleNonExistentProcessOnBothColos() throws Exception {
         Assert.assertEquals(Util.parseResponse(cluster2.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()))
-            .getStatusCode(), 404);
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1)).getStatusCode(), 404);
         Assert.assertEquals(Util.parseResponse(cluster1.getProcessHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()))
-            .getStatusCode(), 404);
-
-    }
-
-    @AfterClass(alwaysRun = true)
-    public void tearDownClass() throws IOException {
-        cleanTestDirs();
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2)).getStatusCode(), 404);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
index 458ddc6..c82786f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
@@ -176,27 +176,28 @@ public class ProcessUITest extends BaseUITestClass {
         //check Process statuses via UI
         EntitiesPage page = new EntitiesPage(DRIVER, cluster, EntityType.PROCESS);
         page.navigateTo();
-
-        softAssert.assertEquals(page.getEntityStatus(bundles[0].getProcessName()),
+        String process = bundles[0].getProcessData();
+        String processName = Util.readEntityName(process);
+        softAssert.assertEquals(page.getEntityStatus(processName),
                 EntitiesPage.EntityStatus.SUBMITTED, "Process status should be SUBMITTED");
-        prism.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
+        prism.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL, process);
 
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-                .getProcessData()), 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+            CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
 
-        softAssert.assertEquals(page.getEntityStatus(bundles[0].getProcessName()),
+        softAssert.assertEquals(page.getEntityStatus(processName),
                 EntitiesPage.EntityStatus.RUNNING, "Process status should be RUNNING");
 
-        ProcessPage processPage = new ProcessPage(DRIVER, cluster, bundles[0].getProcessName());
+        ProcessPage processPage = new ProcessPage(DRIVER, cluster, processName);
         processPage.navigateTo();
 
-        String bundleID = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        String bundleID = InstanceUtil.getLatestBundleID(cluster, processName, EntityType.PROCESS);
         Map<Date, CoordinatorAction.Status> actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID,
                 EntityType.PROCESS);
         checkActions(actions, processPage);
 
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-                .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
 
         processPage.refresh();
         actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID, EntityType.PROCESS);


Mime
View raw message