falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [05/27] adding falcon-regression
Date Mon, 04 Aug 2014 10:04:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
new file mode 100644
index 0000000..c8cef02
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.RetentionUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.supportClasses.JmsMessageConsumer;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+@Test(groups = "embedded")
+public class RetentionTest extends BaseTestClass {
+    private static final String TEST_FOLDERS = "testFolders/";
+    String baseTestHDFSDir = baseHDFSDir + "/RetentionTest/";
+    String testHDFSDir = baseTestHDFSDir + TEST_FOLDERS;
+    private static final Logger logger = Logger.getLogger(RetentionTest.class);
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    OozieClient clusterOC = serverOC.get(0);
+
+    @BeforeMethod(alwaysRun = true)
+    public void testName(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readRetentionBundle();
+        bundles[0] = new Bundle(bundle, cluster);
+        bundles[0].setInputFeedDataPath(testHDFSDir);
+        bundles[0].generateUniqueBundle();
+        bundles[0].submitClusters(prism);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        removeBundles();
+    }
+
+    @Test
+    public void testRetentionWithEmptyDirectories() throws Exception {
+        // test for https://issues.apache.org/jira/browse/FALCON-321
+        testRetention(24, RetentionUnit.HOURS, true, FeedType.DAILY, false);
+    }
+
+    @Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1)
+    public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit,
+        final boolean gaps, final FeedType feedType, final boolean withData) throws Exception {
+        bundles[0].setInputFeedDataPath(testHDFSDir + feedType.getPathValue());
+        final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        feedObject.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
+
+        final ServiceResponse response = prism.getFeedHelper()
+            .submitEntity(URLS.SUBMIT_URL, feedObject.toString());
+        if (retentionPeriod > 0) {
+            AssertUtil.assertSucceeded(response);
+
+            replenishData(feedType, gaps, withData);
+
+            commonDataRetentionWorkflow(feedObject.toString(), feedType, retentionUnit,
+                retentionPeriod);
+        } else {
+            AssertUtil.assertFailed(response);
+        }
+    }
+
+    private void replenishData(FeedType feedType, boolean gap, boolean withData) throws Exception {
+        int skip = 1;
+        if (gap) {
+            skip = gaps[new Random().nextInt(gaps.length)];
+        }
+
+        final DateTime today = new DateTime(DateTimeZone.UTC);
+        final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
+            feedType.addTime(today, -36), feedType.addTime(today, 36), skip, feedType);
+        final List<String> dataDates = TimeUtil.convertDatesToString(times, feedType.getFormatter());
+        logger.info("dataDates = " + dataDates);
+
+        HadoopUtil.replenishData(clusterFS, testHDFSDir, dataDates, withData);
+    }
+
+    private void commonDataRetentionWorkflow(String feed, FeedType feedType,
+        RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
+        IOException, URISyntaxException, AuthenticationException, JMSException {
+        //get Data created in the cluster
+        List<String> initialData = Util.getHadoopDataFromDir(clusterFS, feed, testHDFSDir);
+
+        cluster.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        logger.info(cluster.getClusterHelper().getActiveMQ());
+        final String feedName = Util.readEntityName(feed);
+        logger.info(feedName);
+        JmsMessageConsumer messageConsumer = new JmsMessageConsumer("FALCON." + feedName,
+                cluster.getClusterHelper().getActiveMQ());
+        messageConsumer.start();
+
+        final DateTime currentTime = new DateTime(DateTimeZone.UTC);
+        String bundleId = OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0);
+
+        List<String> workflows = OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC);
+        logger.info("workflows: " + workflows);
+
+        messageConsumer.interrupt();
+        Util.printMessageData(messageConsumer);
+        //now look for cluster data
+        List<String> finalData = Util.getHadoopDataFromDir(clusterFS, feed, testHDFSDir);
+
+        //now see if retention value was matched to as expected
+        List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit,
+            retentionPeriod, feedType);
+
+        logger.info("initialData = " + initialData);
+        logger.info("finalData = " + finalData);
+        logger.info("expectedOutput = " + expectedOutput);
+
+        final List<String> missingData = new ArrayList<String>(initialData);
+        missingData.removeAll(expectedOutput);
+        validateDataFromFeedQueue(feedName, messageConsumer.getReceivedMessages(), missingData);
+
+        Assert.assertEquals(finalData.size(), expectedOutput.size(),
+            "sizes of outputs are different! please check");
+
+        Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
+            expectedOutput.toArray(new String[expectedOutput.size()])));
+    }
+
+    private void validateDataFromFeedQueue(String feedName, List<MapMessage> messages,
+        List<String> missingData) throws OozieClientException, JMSException {
+        //just verify that each element in queue is same as deleted data!
+        List<String> workflowIds = OozieUtil.getWorkflowJobs(cluster,
+                OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
+
+        //create queuedata folderList:
+        List<String> deletedFolders = new ArrayList<String>();
+
+        for (MapMessage message : messages) {
+            if (message != null) {
+                Assert.assertEquals(message.getString("entityName"), feedName);
+                String[] splitData = message.getString("feedInstancePaths").split(TEST_FOLDERS);
+                deletedFolders.add(splitData[splitData.length - 1]);
+                Assert.assertEquals(message.getString("operation"), "DELETE");
+                Assert.assertEquals(message.getString("workflowId"), workflowIds.get(0));
+
+                //verify other data also
+                Assert.assertEquals(message.getString("topicName"), "FALCON." + feedName);
+                Assert.assertEquals(message.getString("brokerImplClass"),
+                    "org.apache.activemq.ActiveMQConnectionFactory");
+                Assert.assertEquals(message.getString("status"), "SUCCEEDED");
+                Assert.assertEquals(message.getString("brokerUrl"),
+                    cluster.getFeedHelper().getActiveMQ());
+            }
+        }
+
+        Assert.assertEquals(deletedFolders.size(), missingData.size(),
+            "Output size is different than expected!");
+        Assert.assertTrue(Arrays.deepEquals(missingData.toArray(new String[missingData.size()]),
+            deletedFolders.toArray(new String[deletedFolders.size()])),
+            "The missing data and message for delete operation don't correspond");
+    }
+
+    private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime,
+        RetentionUnit retentionUnit, int retentionPeriod, FeedType feedType) {
+        final List<String> finalData = new ArrayList<String>();
+        //end date is today's date
+        final String startLimit = feedType.getFormatter().print(
+                retentionUnit.minusTime(currentTime, retentionPeriod));
+
+        //now to actually check!
+        for (String testDate : inputData) {
+            if (testDate.equals(HadoopUtil.SOMETHING_RANDOM)
+                    || testDate.compareTo(startLimit) > 0) {
+                finalData.add(testDate);
+            }
+        }
+        return finalData;
+    }
+
+    final static int[] gaps = new int[]{2, 4, 5, 1};
+
+    @DataProvider(name = "betterDP")
+    public Object[][] getTestData(Method m) {
+        // a negative value like -4 should be covered in validation scenarios.
+        int[] retentionPeriods = new int[]{0, 10080, 60, 8, 24};
+        RetentionUnit[] retentionUnits = new RetentionUnit[]{RetentionUnit.HOURS,
+            RetentionUnit.DAYS};// "minutes","hours", "days",
+        boolean[] gaps = new boolean[]{false, true};
+        FeedType[] feedTypes = new FeedType[]{FeedType.DAILY, FeedType.YEARLY, FeedType.MONTHLY};
+        Object[][] testData = new Object[retentionPeriods.length * retentionUnits.length *
+            gaps.length * feedTypes.length][5];
+
+        int i = 0;
+
+        for (RetentionUnit retentionUnit : retentionUnits) {
+            for (int period : retentionPeriods) {
+                for (boolean gap : gaps) {
+                    for (FeedType feedType : feedTypes) {
+                        testData[i][0] = period;
+                        testData[i][1] = retentionUnit;
+                        testData[i][2] = gap;
+                        testData[i][3] = feedType;
+                        testData[i][4] = true;
+                        i++;
+                    }
+                }
+            }
+        }
+
+        return testData;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
new file mode 100644
index 0000000..fac89d7
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -0,0 +1,759 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import com.jcraft.jsch.JSchException;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClientException;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.xml.sax.SAXException;
+import org.apache.log4j.Logger;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+
+public class UpdateAtSpecificTimeTest extends BaseTestClass {
+
+    private static final Logger logger = Logger.getLogger(UpdateAtSpecificTimeTest.class);
+
+    Bundle processBundle;
+
+    ColoHelper cluster_1 = servers.get(0);
+    ColoHelper cluster_2 = servers.get(1);
+    ColoHelper cluster_3 = servers.get(2);
+    FileSystem cluster2FS = serverFS.get(1);
+
+    private String dateTemplate = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private final String baseTestDir = baseHDFSDir + "/UpdateAtSpecificTimeTest-data";
+    String aggregateWorkflowDir = baseHDFSDir + "/aggregator";
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws IOException {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLocalDCBundle();
+        bundles[0] = new Bundle(bundle, cluster_1);
+        bundles[1] = new Bundle(bundle, cluster_2);
+        bundles[2] = new Bundle(bundle, cluster_3);
+
+        bundles[0].generateUniqueBundle();
+        bundles[1].generateUniqueBundle();
+        bundles[2].generateUniqueBundle();
+
+        processBundle = BundleUtil.readELBundle();
+        processBundle = new Bundle(processBundle, cluster_1);
+        processBundle.generateUniqueBundle();
+        processBundle.setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+        removeBundles(processBundle);
+    }
+
+    @Test(groups = {"singleCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void invalidChar_Process()
+        throws JAXBException, IOException, URISyntaxException,
+        AuthenticationException, OozieClientException {
+        processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0),
+            TimeUtil.getTimeWrtSystemTime(20));
+        processBundle.submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 0);
+        String oldProcess =
+            processBundle.getProcessData();
+        processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(5),
+            TimeUtil.getTimeWrtSystemTime(100));
+        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
+            processBundle.getProcessData(), "abc", null);
+        Assert.assertTrue(r.getMessage()
+            .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
+    }
+
+    @Test(groups = {"singleCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void invalidChar_Feed()
+        throws JAXBException, IOException, URISyntaxException, AuthenticationException,
+        OozieClientException {
+
+        String feed = submitAndScheduleFeed(processBundle);
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
+        //update frequency
+        Frequency f = new Frequency("" + 21, Frequency.TimeUnit.minutes);
+        String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
+
+        ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed, "abc", null);
+        Assert.assertTrue(r.getMessage()
+            .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
+    }
+
+
+    @Test(groups = {"singleCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void updateTimeInPast_Process()
+        throws JAXBException, IOException, URISyntaxException,
+        OozieClientException, AuthenticationException {
+
+        processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0),
+                TimeUtil.getTimeWrtSystemTime(20));
+        processBundle.submitFeedsScheduleProcess(prism);
+
+        //get old process details
+        String oldProcess = processBundle.getProcessData();
+
+        String oldBundleId = InstanceUtil
+            .getLatestBundleID(cluster_1,
+                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, oldProcess, 0);
+
+        List<String> initialNominalTimes = OozieUtil.getActionsNominalTime(cluster_1,
+            oldBundleId, EntityType.PROCESS);
+
+
+        // update process by adding property
+        processBundle.setProcessProperty("someProp", "someValue");
+        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
+            processBundle.getProcessData(),TimeUtil.getTimeWrtSystemTime(-10000), null);
+        AssertUtil.assertSucceeded(r);
+
+        //check new coord created with current time
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleId, initialNominalTimes,
+            processBundle.getProcessData(), true,
+            false);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, oldProcess, 1);
+
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleId, initialNominalTimes,
+            processBundle.getProcessData(), true,
+            true);
+
+    }
+
+    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+
+    public void updateTimeInPast_Feed()
+        throws JAXBException, IOException, OozieClientException,
+        URISyntaxException, AuthenticationException {
+
+
+        String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-10);
+        String startTimeCluster_target = TimeUtil.getTimeWrtSystemTime(10);
+
+        String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_target);
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        //submit and schedule feed
+        ServiceResponse r =
+            prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
+
+        //update frequency
+        Frequency f = new Frequency("" + 7, Frequency.TimeUnit.minutes);
+        String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
+
+        r = prism.getFeedHelper().update(feed, updatedFeed,
+            TimeUtil.getTimeWrtSystemTime(-10000), null);
+        AssertUtil.assertSucceeded(r);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 1);
+
+        //check correct number of coord exists or not
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster_1.getFeedHelper(),
+                Util.readEntityName(feed),
+                "REPLICATION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster_2.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster_1.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 2);
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster_3.getFeedHelper(), Util.readEntityName(feed),
+                "RETENTION"), 2);
+
+    }
+
+
+    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void inNextFewMinutesUpdate_RollForward_Process()
+        throws JAXBException, IOException, URISyntaxException, JSchException,
+        OozieClientException, SAXException, AuthenticationException {
+    /*
+    submit process on 3 clusters. Schedule on 2 clusters. Bring down one of
+    the scheduled cluster. Update with time 5 minutes from now. On running
+    cluster new coord should be created with start time +5 and no instance
+    should be missing. On 3rd cluster where process was only submit,
+    definition should be updated. Bring the down cluster up. Update with same
+     definition again, now the recently up cluster should also have new
+     coords.
+     */
+
+        try {
+            Util.startService(cluster_2.getProcessHelper());
+            String startTime = TimeUtil.getTimeWrtSystemTime(-15);
+            processBundle.setProcessValidity(startTime,
+                TimeUtil.getTimeWrtSystemTime(60));
+            processBundle.addClusterToBundle(bundles[1].getClusters().get(0),
+                ClusterType.SOURCE, null, null);
+            processBundle.addClusterToBundle(bundles[2].getClusters().get(0),
+                ClusterType.SOURCE, null, null);
+            processBundle.submitBundle(prism);
+
+            //schedule of 2 cluster
+            cluster_1.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL,
+                processBundle.getProcessData());
+
+            cluster_2.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL,
+                processBundle.getProcessData());
+
+            InstanceUtil.waitTillInstancesAreCreated(cluster_2, processBundle.getProcessData(), 0);
+
+            //shut down cluster_2
+            Util.shutDownService(cluster_2.getProcessHelper());
+
+            // save old data before update
+            String oldProcess = processBundle.getProcessData();
+            String oldBundleID_cluster1 = InstanceUtil
+                .getLatestBundleID(cluster_1,
+                    Util.readEntityName(oldProcess), EntityType.PROCESS);
+            String oldBundleID_cluster2 = InstanceUtil
+                .getLatestBundleID(cluster_2,
+                    Util.readEntityName(oldProcess), EntityType.PROCESS);
+
+            List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime
+                (cluster_1,
+                    oldBundleID_cluster1, EntityType.PROCESS);
+
+            List<String> oldNominalTimes_cluster2 = OozieUtil.getActionsNominalTime
+                (cluster_2,
+                    oldBundleID_cluster2, EntityType.PROCESS);
+
+            //update process validity
+            processBundle.setProcessProperty("someProp", "someValue");
+
+            //send update request
+            String updateTime = TimeUtil.getTimeWrtSystemTime(5);
+            ServiceResponse r = prism.getProcessHelper()
+                .update(oldProcess, processBundle.getProcessData(), updateTime
+                );
+            AssertUtil.assertPartial(r);
+
+            InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 1);
+
+            //verify new bundle on cluster_1 and definition on cluster_3
+            OozieUtil
+                .verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, oldNominalTimes_cluster1,
+                    oldProcess, true, false);
+
+            OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
+                oldNominalTimes_cluster2,
+                oldProcess, false, false);
+
+            String definition_cluster_3 = Util.getEntityDefinition(cluster_3,
+                processBundle.getProcessData(), true);
+
+            Assert.assertTrue(XmlUtil.isIdentical(definition_cluster_3,
+                processBundle.getProcessData()), "Process definitions should be equal");
+
+            //start the stopped cluster_2
+            Util.startService(cluster_2.getProcessHelper());
+            TimeUtil.sleepSeconds(40);
+
+            String newBundleID_cluster1 = InstanceUtil
+                .getLatestBundleID(cluster_1,
+                    Util.readEntityName(oldProcess), EntityType.PROCESS);
+
+            //send second update request
+            r = prism.getProcessHelper().update(oldProcess,
+                processBundle.getProcessData(),
+                updateTime, null);
+            AssertUtil.assertSucceeded(r);
+
+
+            String def_cluster_2 = Util.getEntityDefinition(cluster_2,
+                processBundle.getProcessData(), true);
+            logger.info("def_cluster_2 : " + Util.prettyPrintXml(def_cluster_2));
+
+            // verify new bundle in cluster_2 and no new bundle in cluster_1  and
+            OozieUtil
+                .verifyNewBundleCreation(cluster_1, newBundleID_cluster1, oldNominalTimes_cluster1,
+                    oldProcess, false, false);
+
+            OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
+                oldNominalTimes_cluster2,
+                oldProcess, true, false);
+
+            //wait till update time is reached
+            TimeUtil.sleepTill(updateTime);
+
+            OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
+                oldNominalTimes_cluster2,
+                oldProcess, true, true);
+
+            OozieUtil
+                .verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, oldNominalTimes_cluster1,
+                    oldProcess, true, true);
+        } finally {
+            Util.restartService(cluster_2.getProcessHelper());
+        }
+    }
+
+    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void inNextFewMinutesUpdate_RollForward_Feed()
+        throws JAXBException, IOException, URISyntaxException, JSchException, 
+        OozieClientException, SAXException, AuthenticationException {
+        try {
+            String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-18);
+
+            String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_source);
+
+            logger.info("feed: " + Util.prettyPrintXml(feed));
+
+            //submit feed on all 3 clusters
+            ServiceResponse r = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feed);
+            AssertUtil.assertSucceeded(r);
+
+            //schedule feed of cluster_1 and cluster_2
+            r = cluster_1.getFeedHelper().schedule(Util.URLS.SCHEDULE_URL, feed);
+            AssertUtil.assertSucceeded(r);
+            r = cluster_2.getFeedHelper().schedule(Util.URLS.SCHEDULE_URL, feed);
+            AssertUtil.assertSucceeded(r);
+
+            InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
+
+            //shutdown cluster_2
+            Util.shutDownService(cluster_2.getProcessHelper());
+
+            //add some property to feed so that new bundle is created
+            String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal");
+
+            //save old data
+            String oldBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster_1,
+                Util.readEntityName(feed), EntityType.FEED);
+
+            List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime
+                (cluster_1,
+                    oldBundle_cluster1, EntityType.FEED);
+
+            //send update command with +5 mins in future
+            String updateTime = TimeUtil.getTimeWrtSystemTime(5);
+            r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
+            AssertUtil.assertPartial(r);
+
+            //verify new bundle creation on cluster_1 and new definition on cluster_3
+            OozieUtil
+                .verifyNewBundleCreation(cluster_1, oldBundle_cluster1, oldNominalTimes_cluster1,
+                    feed, true, false);
+
+
+            String definition = Util.getEntityDefinition(cluster_3, feed, true);
+            Diff diff = XMLUnit.compareXML(definition, processBundle.getProcessData());
+            logger.info(diff);
+
+            //start stopped cluster_2
+            Util.startService(cluster_2.getProcessHelper());
+
+            String newBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster_1,
+                Util.readEntityName(feed), EntityType.FEED);
+
+            //send update again
+            r = prism.getFeedHelper().update(feed, updatedFeed, updateTime);
+            AssertUtil.assertSucceeded(r);
+
+            //verify new bundle creation on cluster_2 and no new bundle on cluster_1
+            Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster_2.getFeedHelper(), Util.readEntityName(feed),
+                    "RETENTION"), 2);
+
+            OozieUtil
+                .verifyNewBundleCreation(cluster_1, newBundle_cluster1, oldNominalTimes_cluster1,
+                    feed, false, false);
+            //wait till update time is reached
+            TimeUtil.sleepTill(updateTime);
+
+            //verify new bundle creation with instance matching
+            OozieUtil
+                .verifyNewBundleCreation(cluster_1, oldBundle_cluster1, oldNominalTimes_cluster1,
+                    feed, true, true);
+
+        } finally {
+            Util.restartService(cluster_2.getProcessHelper());
+        }
+    }
+
+
+    @Test(groups = {"multiCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void updateTimeAfterEndTime_Process()
+        throws JAXBException, InterruptedException, IOException, URISyntaxException,
+        OozieClientException, AuthenticationException {
+
+    /*
+      submit and schedule process with end time after 60 mins. Set update time
+       as with +60 from start mins.
+    */
+        logger.info("Running test updateTimeAfterEndTime_Process");
+        String startTime = TimeUtil.getTimeWrtSystemTime(-15);
+        String endTime = TimeUtil.getTimeWrtSystemTime(60);
+        processBundle.setProcessValidity(startTime, endTime);
+        processBundle.submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(10);
+
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(0),
+            Util.readEntityName(processBundle.getProcessData()), 0,
+            CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+
+        //save old data
+        String oldProcess = processBundle.getProcessData();
+
+        String oldBundleID = InstanceUtil
+            .getLatestBundleID(cluster_1,
+                Util.readEntityName(oldProcess), EntityType.PROCESS);
+
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster_1, oldBundleID,
+            EntityType.PROCESS);
+
+        //update
+        processBundle.setProcessProperty("someProp", "someVal");
+        String updateTime = TimeUtil.addMinsToTime(endTime, 60);
+
+        logger.info("Original Feed : " + Util.prettyPrintXml(oldProcess));
+        logger.info("Updated Feed :" + Util.prettyPrintXml(processBundle.getProcessData()));
+        logger.info("Update Time : " + updateTime);
+
+
+        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
+            processBundle.getProcessData(), updateTime, null);
+        AssertUtil.assertSucceeded(r);
+
+        //verify new bundle creation with instances matching
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, oldNominalTimes,
+            oldProcess, true, false);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 1);
+
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, oldNominalTimes,
+            oldProcess, true, true);
+    }
+
+    @Test(groups = {"multiCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void updateTimeAfterEndTime_Feed()
+        throws JAXBException, IOException, OozieClientException,
+        URISyntaxException, AuthenticationException {
+    /*
+    submit and schedule feed with end time 60 mins in future and update with
+    +60
+     in future.
+     */
+        String startTime = TimeUtil.getTimeWrtSystemTime(-15);
+        String endTime = TimeUtil.getTimeWrtSystemTime(60);
+
+        String feed = processBundle.getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime, endTime),
+            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
+            Util.readEntityName(processBundle.getClusters().get(0)), ClusterType.SOURCE,
+            null, baseTestDir + "/replication" + dateTemplate);
+
+
+        ServiceResponse r = prism.getClusterHelper().submitEntity(Util.URLS.SUBMIT_URL,
+            processBundle.getClusters().get(0));
+        AssertUtil.assertSucceeded(r);
+        r = prism.getFeedHelper().submitAndSchedule(Util.URLS
+            .SUBMIT_AND_SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(r);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
+        //save old data
+
+        String oldBundleID = InstanceUtil
+            .getLatestBundleID(cluster_1,
+                Util.readEntityName(feed), EntityType.FEED);
+
+        String updateTime = TimeUtil.addMinsToTime(endTime, 60);
+        String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal");
+
+        logger.info("Original Feed : " + Util.prettyPrintXml(feed));
+        logger.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed));
+        logger.info("Update Time : " + updateTime);
+
+        r = prism.getFeedHelper().update(feed, updatedFeed, updateTime);
+        AssertUtil.assertSucceeded(r);
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 1);
+
+        //verify new bundle creation
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, null,
+            feed, true, false);
+    }
+
+    @Test(groups = {"multiCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void updateTimeBeforeStartTime_Process() throws JAXBException, IOException,
+        URISyntaxException, OozieClientException, AuthenticationException {
+
+    /*
+      submit and schedule process with start time +10 mins from now. Update
+      with start time -4 and update time +2 mins
+     */
+        String startTime = TimeUtil.getTimeWrtSystemTime(10);
+        String endTime = TimeUtil.getTimeWrtSystemTime(20);
+        processBundle.setProcessValidity(startTime, endTime);
+        processBundle.submitFeedsScheduleProcess(prism);
+        //save old data
+        String oldProcess = processBundle.getProcessData();
+        String oldBundleID = InstanceUtil
+            .getLatestBundleID(cluster_1,
+                Util.readEntityName(oldProcess), EntityType.PROCESS);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster_1, oldBundleID,
+            EntityType.PROCESS);
+
+        processBundle.setProcessValidity(TimeUtil.addMinsToTime(startTime, -4),
+            endTime);
+        String updateTime = TimeUtil.getTimeWrtSystemTime(2);
+        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
+            processBundle.getProcessData(), updateTime, null);
+        AssertUtil.assertSucceeded(r);
+        TimeUtil.sleepSeconds(10);
+        //verify new bundle creation
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, oldNominalTimes,
+            oldProcess, true, false);
+
+    }
+
+    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
+        enabled = true)
+    public void updateDiffClusterDiffValidity_Process()
+        throws JAXBException, IOException, URISyntaxException, OozieClientException,
+        AuthenticationException {
+
+        //set start end process time for 3 clusters
+        String startTime_cluster1 = TimeUtil.getTimeWrtSystemTime(-40);
+        String endTime_cluster1 = TimeUtil.getTimeWrtSystemTime(3);
+        String startTime_cluster2 = TimeUtil.getTimeWrtSystemTime(120);
+        String endTime_cluster2 = TimeUtil.getTimeWrtSystemTime(240);
+        String startTime_cluster3 = TimeUtil.getTimeWrtSystemTime(-30);
+        String endTime_cluster3 = TimeUtil.getTimeWrtSystemTime(180);
+
+
+        //create multi cluster bundle
+        processBundle.setProcessValidity(startTime_cluster1,
+            endTime_cluster1);
+        processBundle.addClusterToBundle(bundles[1].getClusters().get(0),
+            ClusterType.SOURCE, startTime_cluster2, endTime_cluster2);
+        processBundle.addClusterToBundle(bundles[2].getClusters().get(0),
+            ClusterType.SOURCE, startTime_cluster3, endTime_cluster3);
+
+        //submit and schedule
+        processBundle.submitFeedsScheduleProcess(prism);
+
+        //wait for coord to be in running state
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster_3, processBundle.getProcessData(), 0);
+
+        //save old info
+        String oldBundleID_cluster1 = InstanceUtil
+            .getLatestBundleID(cluster_1,
+                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
+        List<String> nominalTimes_cluster1 =
+            OozieUtil.getActionsNominalTime(cluster_1, oldBundleID_cluster1,
+                EntityType.PROCESS);
+        String oldBundleID_cluster2 = InstanceUtil
+            .getLatestBundleID(cluster_2,
+                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
+        String oldBundleID_cluster3 = InstanceUtil
+            .getLatestBundleID(cluster_3,
+                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
+        List<String> nominalTimes_cluster3 = OozieUtil.getActionsNominalTime
+            (cluster_3, oldBundleID_cluster3,
+                EntityType.PROCESS);
+
+
+        //update process
+        String updateTime = TimeUtil.addMinsToTime(endTime_cluster1, 3);
+        processBundle.setProcessProperty("someProp", "someVal");
+        ServiceResponse r = prism.getProcessHelper().update(processBundle.getProcessData(),
+            processBundle.getProcessData(), updateTime, null);
+        AssertUtil.assertSucceeded(r);
+
+        //check for new bundle to be created
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, nominalTimes_cluster1,
+            processBundle.getProcessData(), true, false);
+        OozieUtil.verifyNewBundleCreation(cluster_3, oldBundleID_cluster3,
+            nominalTimes_cluster3,
+            processBundle.getProcessData(), true, false);
+        OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
+            nominalTimes_cluster3,
+            processBundle.getProcessData(), true, false);
+
+        //wait till new coord are running on Cluster1
+        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 1);
+        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, nominalTimes_cluster1,
+            processBundle.getProcessData(), true, true);
+
+        //verify
+        String coordStartTime_cluster3 = OozieUtil.getCoordStartTime(cluster_3,
+            processBundle.getProcessData(), 1);
+        String coordStartTime_cluster2 = OozieUtil.getCoordStartTime(cluster_2,
+            processBundle.getProcessData(), 1);
+
+        if (!(TimeUtil.oozieDateToDate(coordStartTime_cluster3).isAfter
+            (TimeUtil.oozieDateToDate(updateTime)) || TimeUtil
+            .oozieDateToDate(coordStartTime_cluster3).isEqual
+                (TimeUtil.oozieDateToDate(updateTime))))
+            Assert.assertTrue(false, "new coord start time is not correct");
+
+        if (TimeUtil.oozieDateToDate(coordStartTime_cluster2).isEqual
+            (TimeUtil.oozieDateToDate(updateTime)))
+            Assert.assertTrue(false, "new coord start time is not correct");
+
+        TimeUtil.sleepTill(updateTime);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster_3, processBundle.getProcessData(), 1);
+
+        //verify that no instance are missing
+        OozieUtil.verifyNewBundleCreation(cluster_3, oldBundleID_cluster3,
+            nominalTimes_cluster3,
+            processBundle.getProcessData(), true, true);
+    }
+
+    private String submitAndScheduleFeed(Bundle b)
+        throws JAXBException, IOException, URISyntaxException, AuthenticationException {
+        String feed = b.getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+        feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity
+                ("2012-10-01T12:10Z", "2099-10-01T12:10Z"),
+            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
+            Util.readEntityName(b.getClusters().get(0)), ClusterType.SOURCE, "",
+            "/someTestPath" + dateTemplate);
+        ServiceResponse r = prism.getClusterHelper().submitEntity(Util.URLS
+                .SUBMIT_URL,
+            b.getClusters().get(0));
+        AssertUtil.assertSucceeded(r);
+        r = prism.getFeedHelper().submitAndSchedule(Util.URLS
+            .SUBMIT_AND_SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(r);
+
+        return feed;
+    }
+
+
+    private String getMultiClusterFeed(String startTimeCluster_source,
+                                       String startTimeCluster_target)
+        throws IOException, URISyntaxException, AuthenticationException {
+        String testDataDir = baseTestDir + "/replication";
+
+        //create desired feed
+        String feed = bundles[0].getDataSets().get(0);
+
+        //cluster_1 is target, cluster_2 is source and cluster_3 is neutral
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTimeCluster_source, "2099-10-01T12:10Z"),
+            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), null, null);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTimeCluster_target, "2099-10-01T12:25Z"),
+            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+            null,
+            testDataDir + dateTemplate);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTimeCluster_source, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            null, testDataDir + dateTemplate);
+
+        //submit clusters
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        //create test data on cluster_2
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeCluster_source,
+            TimeUtil.getTimeWrtSystemTime(60), 1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE,
+            testDataDir + "/", dataDates);
+        return feed;
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(Method method) {
+        logger.info("tearDown " + method.getName());
+        processBundle.deleteBundle(prism);
+        bundles[0].deleteBundle(prism);
+        processBundle.deleteBundle(prism);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java
new file mode 100644
index 0000000..c62935e
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.ui;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.helpers.LineageHelper;
+import org.apache.falcon.regression.core.response.lineage.Direction;
+import org.apache.falcon.regression.core.response.lineage.Edge;
+import org.apache.falcon.regression.core.response.lineage.Vertex;
+import org.apache.falcon.regression.core.response.lineage.VerticesResult;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseUITestClass;
+import org.apache.falcon.regression.ui.pages.ProcessPage;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.openqa.selenium.Point;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "lineage-ui")
+public class LineageGraphTest extends BaseUITestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private String baseTestDir = baseHDFSDir + "/LineageGraphTest";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private static final Logger logger = Logger.getLogger(LineageGraphTest.class);
+    String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    String feedInputPath = baseTestDir + "/input" + datePattern;
+    private FileSystem clusterFS = serverFS.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private String processName = null;
+    private String inputFeedName = null;
+    private String outputFeedName = null;
+    int inputEnd = 4;
+    private List<Vertex> piVertices;
+    LineageHelper lineageHelper = new LineageHelper(prism);
+
+    /**
+     * Adjusts bundle and schedules it. Provides process with data, waits till some instances got
+     * succeeded.
+     */
+    @BeforeClass
+    public void setUp()
+        throws IOException, JAXBException, URISyntaxException, AuthenticationException,
+        OozieClientException {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        logger.info("Start time: " + startTime + "\tEnd time: " + endTime);
+
+        /**prepare process definition*/
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessInput("now(0,0)", String.format("now(0,%d)", inputEnd - 1));
+
+        /**provide necessary data for first 3 instances to run*/
+        logger.info("Creating necessary data...");
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+            TimeUtil.addMinsToTime(startTime, -2), endTime, 0);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+        logger.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitBundle(prism);
+
+        processName = bundles[0].getProcessName();
+        inputFeedName = bundles[0].getInputFeedNameFromBundle();
+        outputFeedName = bundles[0].getOutputFeedNameFromBundle();
+        /**schedule process, wait for instances to succeed*/
+        prism.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        /**get process instances*/
+        Vertex processVertex = lineageHelper.getVerticesByName(processName).getResults().get(0);
+        piVertices = lineageHelper.getVerticesByDirection(processVertex.get_id(),
+            Direction.inComingVertices).filterByType(Vertex.VERTEX_TYPE.PROCESS_INSTANCE);
+        openBrowser();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void tearDown() throws IOException {
+        closeBrowser();
+        removeBundles();
+    }
+
+    /**
+     * Tests the number of vertices on graph and if they match to expected number of instances
+     * and their description.
+     */
+    @Test
+    public void testGraphVertices() {
+
+        ProcessPage processPage = new ProcessPage(DRIVER, cluster, processName);
+        processPage.navigateTo();
+        for (Vertex piVertex : piVertices) {
+            String nominalTime = piVertex.getNominalTime();
+            /* get expected feed instances */
+            /* input feed instances */
+            List<Vertex> inpInstancesAPI = lineageHelper.getVerticesByDirection(piVertex.get_id(),
+                Direction.inComingVertices).getResults();
+            /* output feed instance */
+            List<Vertex> outInstancesAPI = lineageHelper.getVerticesByDirection(piVertex.get_id(),
+                Direction.outgoingVertices).filterByType(Vertex.VERTEX_TYPE.FEED_INSTANCE);
+            /* open lineage for particular process instance */
+            processPage.openLineage(nominalTime);
+            /* verify if number of vertices and their content is correct */
+            HashMap<String, List<String>> map = processPage.getAllVertices();
+            Assert.assertTrue(map.containsKey(processName) && map.containsKey(inputFeedName)
+                && map.containsKey(outputFeedName));
+            /* process validation */
+            List<String> processInstancesUI = map.get(processName);
+            Assert.assertEquals(processInstancesUI.size(), 1);
+            Assert.assertEquals(processInstancesUI.get(0), nominalTime);
+            /* input feed validations */
+            List<String> inpInstancesUI = map.get(inputFeedName);
+            logger.info("InputFeed instances on lineage UI : " + inpInstancesUI);
+            logger.info("InputFeed instances from API : " + inpInstancesAPI);
+            Assert.assertEquals(inpInstancesUI.size(), inpInstancesAPI.size());
+            for (Vertex inpInstanceAPI : inpInstancesAPI) {
+                Assert.assertTrue(inpInstancesUI.contains(inpInstanceAPI.getNominalTime()));
+            }
+            /* output feed validation */
+            List<String> outInstancesUI = map.get(outputFeedName);
+            logger.info("Expected outputFeed instances : " + outInstancesUI);
+            logger.info("Actual instance : " + outInstancesAPI);
+            Assert.assertEquals(outInstancesUI.size(), outInstancesAPI.size());
+            for (Vertex outInstanceAPI : outInstancesAPI) {
+                Assert.assertTrue(outInstancesUI.contains(outInstanceAPI.getNominalTime()));
+            }
+            processPage.refresh();
+        }
+    }
+
+    /**
+     * Clicks on each vertex and check the content of info panel
+     */
+    @Test
+    public void testVerticesInfo()
+        throws JAXBException, URISyntaxException, AuthenticationException, IOException {
+        String clusterName = Util.readEntityName(bundles[0].getClusters().get(0));
+        ProcessPage processPage = new ProcessPage(DRIVER, cluster, processName);
+        processPage.navigateTo();
+        for (Vertex piVertex : piVertices) {
+            String nominalTime = piVertex.getNominalTime();
+            /**open lineage for particular process instance*/
+            processPage.openLineage(nominalTime);
+            HashMap<String, List<String>> map = processPage.getAllVertices();
+            /**click on each vertex and check the bottom info*/
+            for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+                String entityName = entry.getKey();
+                List<String> entityInstances = entry.getValue();
+                for (String entityInstance : entityInstances) {
+                    processPage.clickOnVertex(entityName, entityInstance);
+                    HashMap<String, String> info = processPage.getPanelInfo();
+                    if (entityName.equals(processName)) {
+                        String message = "Lineage info-panel reflects invalid %s for process %s.";
+                        String workflow = processName + "-workflow";
+                        Assert.assertEquals(info.get("User workflow"), workflow,
+                            String.format(message, "workflow", processName));
+                        Assert.assertEquals(info.get("User workflow engine"), "oozie",
+                            String.format(message, "engine", processName));
+                        Assert.assertEquals(info.get("Runs on"), clusterName,
+                            String.format(message, "cluster", processName));
+                    }
+                    Assert.assertEquals(info.get("Owned by"), System.getProperty("user" +
+                        ".name"), "Entity should be owned by current system user.");
+                }
+            }
+            processPage.refresh();
+        }
+    }
+
+    /**
+     * Tests available titles and descriptions of different lineage sections.
+     */
+    @Test
+    public void testTitlesAndDescriptions() {
+        HashMap<String, String> expectedDescriptions = new HashMap<String, String>();
+        expectedDescriptions.put("lineage-legend-process-inst", "Process instance");
+        expectedDescriptions.put("lineage-legend-process-inst lineage-legend-terminal",
+            "Process instance (terminal)");
+        expectedDescriptions.put("lineage-legend-feed-inst", "Feed instance");
+        expectedDescriptions.put("lineage-legend-feed-inst lineage-legend-terminal",
+            "Feed instance (terminal)");
+        ProcessPage processPage = new ProcessPage(DRIVER, prism, processName);
+        processPage.navigateTo();
+        for (Vertex piVertex : piVertices) {
+            String nominalTime = piVertex.getNominalTime();
+            processPage.openLineage(nominalTime);
+            /* check the main lineage title */
+            Assert.assertEquals(processPage.getLineageTitle(), "Lineage information");
+            /* check legends title */
+            Assert.assertEquals(processPage.getLegendsTitle(), "Legends");
+            /* check that all legends are present and match to expected*/
+            HashMap<String, String> legends = processPage.getLegends();
+            for (Map.Entry<String, String> entry : legends.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+                Assert.assertEquals(expectedDescriptions.get(key), value);
+            }
+            processPage.refresh();
+        }
+    }
+
+    /**
+     * Tests whether vertices are terminals or not.
+     */
+    @Test
+    public void testTerminals() {
+        ProcessPage processPage = new ProcessPage(DRIVER, prism, processName);
+        processPage.navigateTo();
+        lineageHelper = new LineageHelper(prism);
+        VerticesResult processResult = lineageHelper.getVerticesByName(processName);
+        Vertex processVertex = processResult.getResults().get(0);
+        List<Vertex> piVertices =
+            lineageHelper.getVerticesByDirection(processVertex.get_id(),
+                Direction.inComingVertices).filterByType(Vertex.VERTEX_TYPE.PROCESS_INSTANCE);
+        for (Vertex piVertex : piVertices) {
+            String nominalTime = piVertex.getNominalTime();
+            processPage.openLineage(nominalTime);
+            List<Vertex> inVertices = lineageHelper.getVerticesByDirection(piVertex.get_id(),
+                Direction.inComingVertices).getResults();
+            List<Vertex> outVertices = lineageHelper.getVerticesByDirection(piVertex.get_id(),
+                Direction.outgoingVertices).filterByType(Vertex.VERTEX_TYPE.FEED_INSTANCE);
+            for (Vertex inVertex : inVertices) {
+                Assert.assertTrue(processPage.isTerminal(inVertex.getName()),
+                    String.format("Input feed instance vertex %s should be terminal",
+                        inVertex.getName()));
+            }
+            for (Vertex outVertex : outVertices) {
+                Assert.assertTrue(processPage.isTerminal(outVertex.getName()),
+                    String.format("Output feed instance vertex %s should be terminal",
+                        outVertex.getName()));
+            }
+            Assert.assertFalse(processPage.isTerminal(piVertex.getName()),
+                String.format("Process instance vertex %s should be non-terminal",
+                    piVertex.getName()));
+            processPage.refresh();
+        }
+    }
+
+    /**
+     * Evaluates expected number of edges and their endpoints and compares them with
+     * endpoints of edges which were retrieved from lineage graph.
+     */
+    @Test
+    public void testEdges() {
+        ProcessPage processPage = new ProcessPage(DRIVER, prism, processName);
+        processPage.navigateTo();
+        for (Vertex piVertex : piVertices) {
+            String nominalTime = piVertex.getNominalTime();
+            processPage.openLineage(nominalTime);
+            /**get expected edges */
+            List<Edge> expectedEdgesAPI = new ArrayList<Edge>();
+            List<Edge> incEdges = lineageHelper.getEdgesByDirection(piVertex.get_id(),
+                Direction.inComingEdges).getResults();
+            List<Edge> outcEdges = lineageHelper.getEdgesByDirection(piVertex.get_id(),
+                Direction.outGoingEdges).filterByType(Edge.LEBEL_TYPE.OUTPUT);
+            assert expectedEdgesAPI.addAll(incEdges);
+            assert expectedEdgesAPI.addAll(outcEdges);
+            /** Check the number of edges and their location*/
+            List<Point[]> edgesOnUI = processPage.getEdgesFromGraph();
+            /**check the number of edges on UI*/
+            Assert.assertEquals(edgesOnUI.size(), expectedEdgesAPI.size());
+            /**check if all appropriate edges match each other*/
+            List<Point[]> edgesFromGraph = processPage.getEdgesFromGraph();
+            boolean isEdgePresent = false;
+            int vertexRadius = processPage.getCircleRadius();
+            for (Edge expEdgeAPI : expectedEdgesAPI) {
+                Vertex startVertexAPI = lineageHelper.getVertexById(expEdgeAPI.get_outV())
+                    .getResults();
+                Point startPointAPI = processPage.getVertexEndpoint(startVertexAPI.getName());
+                Vertex endVertexAPI = lineageHelper.getVertexById(expEdgeAPI.get_inV())
+                    .getResults();
+                Point endPointAPI = processPage.getVertexEndpoint(endVertexAPI.getName());
+                for (Point[] actualEndpoints : edgesFromGraph) {
+                    Point startPointUI = actualEndpoints[0];
+                    Point endPointUI = actualEndpoints[1];
+                    isEdgePresent =
+                        isPointNearTheVertex(startPointAPI, vertexRadius, startPointUI, 5)
+                            && isPointNearTheVertex(endPointAPI, vertexRadius, endPointUI, 5);
+                    if (isEdgePresent) break;
+                }
+                Assert.assertTrue(
+                    isEdgePresent, String.format("Edge %s-->%s isn't present on lineage or " +
+                    "painted incorrectly.", startVertexAPI.getName(), endVertexAPI.getName()));
+            }
+            processPage.refresh();
+        }
+    }
+
+    /**
+     * Test which opens and closes Lineage info and checks content of it
+     */
+    @Test
+    public void testLineageOpenClose() {
+        ProcessPage processPage = new ProcessPage(DRIVER, prism, processName);
+        processPage.navigateTo();
+        List<String> previous = new ArrayList<String>();
+        for (Vertex piVertex : piVertices) {
+            String nominalTime = piVertex.getNominalTime();
+            processPage.openLineage(nominalTime);
+            List<String> vertices = processPage.getAllVerticesNames();
+            Assert.assertNotEquals(previous, vertices, "Graph of " + nominalTime + " instance is "
+                + "equal to previous");
+            previous = vertices;
+            processPage.closeLineage();
+        }
+    }
+
+    /**
+     * Evaluates if endpoint is in permissible region near the vertex
+     *
+     * @param center    coordinates of vertex center
+     * @param radius    radius of vertex
+     * @param deviation permissible deviation
+     */
+    private boolean isPointNearTheVertex(Point center, int radius, Point point, int deviation) {
+        double distance = Math.sqrt(
+            Math.pow(point.getX() - center.getX(), 2) +
+                Math.pow(point.getY() - center.getY(), 2));
+        return distance <= radius + deviation;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
new file mode 100644
index 0000000..356920c
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.ui;
+
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.CleanupUtil;
+import org.apache.falcon.regression.core.util.Generator;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.lineage.LineageApiTest;
+import org.apache.falcon.regression.testHelper.BaseUITestClass;
+import org.apache.falcon.regression.ui.pages.EntitiesPage;
+import org.apache.falcon.regression.ui.pages.ProcessPage;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.asserts.SoftAssert;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+@Test(groups = "lineage-ui")
+public class ProcessUITest extends BaseUITestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private String baseTestDir = baseHDFSDir + "/TestProcessUI";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private static final Logger logger = Logger.getLogger(ProcessUITest.class);
+    String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    String feedInputPath = baseTestDir + "/input";
+    final String feedOutputPath = baseTestDir + "/output";
+    private FileSystem clusterFS = serverFS.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private SoftAssert softAssert = new SoftAssert();
+
+    @BeforeMethod
+    public void setUp()
+        throws IOException, JAXBException, NoSuchMethodException, IllegalAccessException,
+        InvocationTargetException, URISyntaxException, AuthenticationException {
+        CleanupUtil.cleanAllEntities(prism);
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        openBrowser();
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        logger.info("Start time: " + startTime + "\tEnd time: " + endTime);
+
+        //prepare process definition
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setInputFeedDataPath(feedInputPath + datePattern);
+        Process process = bundles[0].getProcessObject();
+        Inputs inputs = new Inputs();
+        Input input = new Input();
+        input.setFeed(Util.readEntityName(bundles[0].getInputFeedFromBundle()));
+        input.setStart("now(0,0)");
+        input.setEnd("now(0,4)");
+        input.setName("inputData");
+        inputs.getInputs().add(input);
+        process.setInputs(inputs);
+
+        bundles[0].setProcessData(process.toString());
+
+        //provide necessary data for first 3 instances to run
+        logger.info("Creating necessary data...");
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+            TimeUtil.addMinsToTime(startTime, -2), endTime, 0);
+
+        // use 5 <= x < 10 input feeds
+        final int numInputFeeds = 5 + new Random().nextInt(5);
+        // use 5 <= x < 10 output feeds
+        final int numOutputFeeds = 5 + new Random().nextInt(5);
+
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+
+        prefix = prefix.substring(0, prefix.length()-1);
+        for (int k = 1; k <= numInputFeeds; k++) {
+            HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
+                prefix + "_00" + k + "/", dataDates);
+        }
+
+        logger.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        FeedMerlin[] inputFeeds;
+        FeedMerlin[] outputFeeds;
+        final FeedMerlin inputMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        final FeedMerlin outputMerlin = new FeedMerlin(bundles[0].getOutputFeedFromBundle());
+
+
+        inputFeeds = LineageApiTest.generateFeeds(numInputFeeds, inputMerlin,
+                Generator.getNameGenerator("infeed", inputMerlin.getName()),
+                Generator.getHadoopPathGenerator(feedInputPath, datePattern));
+        int j = 0;
+        for (FeedMerlin feed : inputFeeds) {
+            bundles[0].addInputFeedToBundle("inputFeed" + j, feed.toString(), j++);
+        }
+
+        outputFeeds = LineageApiTest.generateFeeds(numOutputFeeds, outputMerlin,
+                Generator.getNameGenerator("outfeed", outputMerlin.getName()),
+                Generator.getHadoopPathGenerator(feedOutputPath, datePattern));
+        j = 0;
+        for (FeedMerlin feed : outputFeeds) {
+            bundles[0].addOutputFeedToBundle("outputFeed" + j, feed.toString(), j++);
+        }
+
+        AssertUtil.assertSucceeded(bundles[0].submitBundle(prism));
+
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(Method method) throws IOException {
+        closeBrowser();
+        removeBundles();
+    }
+
+    /**
+     * Test checks that UI show expected statuses of submitted Process (SUBMITTED and RUNNING)
+     * then checks instances icons to be relevant to statuses of oozie actions
+     * and checks that Lineage links are available only for SUCCEEDED instances
+     */
+    @Test
+    public void testProcessUI()
+        throws URISyntaxException, IOException, AuthenticationException, JAXBException,
+        OozieClientException {
+
+        //check Process statuses via UI
+        EntitiesPage page = new EntitiesPage(DRIVER, cluster, EntityType.PROCESS);
+        page.navigateTo();
+
+        softAssert.assertEquals(page.getEntityStatus(bundles[0].getProcessName()),
+                EntitiesPage.EntityStatus.SUBMITTED, "Process status should be SUBMITTED");
+        prism.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+                .getProcessData()), 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+
+        softAssert.assertEquals(page.getEntityStatus(bundles[0].getProcessName()),
+                EntitiesPage.EntityStatus.RUNNING, "Process status should be RUNNING");
+
+        ProcessPage processPage = new ProcessPage(DRIVER, cluster, bundles[0].getProcessName());
+        processPage.navigateTo();
+
+        String bundleID = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        Map<Date, CoordinatorAction.Status> actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID,
+                EntityType.PROCESS);
+        checkActions(actions, processPage);
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+                .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        processPage.refresh();
+        actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID, EntityType.PROCESS);
+        checkActions(actions, processPage);
+
+        softAssert.assertAll();
+    }
+
+    private void checkActions(Map<Date, CoordinatorAction.Status> actions, ProcessPage page) {
+        for(Date date : actions.keySet()) {
+            String oozieDate = TimeUtil.dateToOozieDate(date);
+            String status = page.getInstanceStatus(oozieDate);
+            //checks instances icons to be relevant to statuses of oozie actions
+            softAssert.assertNotNull(status, oozieDate + " instance is not present on UI");
+            softAssert.assertEquals(status, actions.get(date).toString(), "Status of instance '"
+                    + oozieDate + "' is not the same via oozie and via UI");
+
+            //check that Lineage links are available only for SUCCEEDED instances
+            boolean isPresent = page.isLineageLinkPresent(oozieDate);
+            if(actions.get(date) == CoordinatorAction.Status.SUCCEEDED) {
+                softAssert.assertTrue(isPresent, "Lineage button should be present for instance: " + oozieDate);
+            } else {
+                softAssert.assertFalse(isPresent, "Lineage button should not be present for instance: " + oozieDate);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/testng.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/testng.xml b/falcon-regression/merlin/src/test/java/testng.xml
new file mode 100644
index 0000000..0e442c0
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/testng.xml
@@ -0,0 +1,31 @@
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+
+<suite name="Suite1" verbose="1">
+    <test name="com.inmobi.qa.airavatqa">
+        <classes>
+            <class name="com.inmobi.qa.airavatqa.FeedValidations"/>
+            <methods>
+                <include name="test"/>
+            </methods>
+
+        </classes>
+    </test>
+</suite>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/resources/AvailabilityBundle/depends.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/AvailabilityBundle/depends.txt b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/depends.txt
new file mode 100644
index 0000000..26a8896
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/depends.txt
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/cluster-0.1.xml b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/cluster-0.1.xml
new file mode 100644
index 0000000..3546de6
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/cluster-0.1.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<cluster colo="gs" description="" name="corp" xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<interfaces>
+		<interface type="readonly" endpoint="hftp://gs1001.grid.corp.inmobi.com:50070"
+			version="0.20.2" />
+		<interface type="write" endpoint="hdfs://gs1001.grid.corp.inmobi.com:54310"
+			version="0.20.2" />
+		<interface type="execute" endpoint="hdfs://gs1001.grid.corp.inmobi.com:54311" version="0.20.2" />
+		<interface type="workflow" endpoint="http://gs1001.grid.corp.inmobi.com:11000/oozie/"
+			version="3.1" />
+		<interface type="messaging" endpoint="tcp://gs1001.grid.corp.inmobi.com:61616?daemon=true"
+			version="5.1.6" />
+	</interfaces>
+	<locations>
+		<location name="staging" path="/projects/ivory/staging" />
+		<location name="temp" path="/tmp" />
+		<location name="working" path="/projects/ivory/working" />
+	</locations>
+	<properties>
+		<property name="field1" value="value1" />
+		<property name="field2" value="value2" />
+	</properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template1.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template1.xml b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template1.xml
new file mode 100644
index 0000000..2a1d9f7
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template1.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="raaw-logs16" xmlns="uri:falcon:feed:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+<!--   <partitions>
+        <partition name="fraud" />
+        <partition name="good" />
+    </partitions>
+
+    <groups>online,bi</groups>-->
+
+       <frequency>minutes(20)</frequency>
+       <timezone>UTC</timezone>   
+        <late-arrival cut-off="hours(6)" />
+
+    <clusters>
+        <cluster name="corp" type="source">
+            <validity start="2009-02-01T00:00Z" end="2099-05-01T00:00Z" />
+            <retention limit="months(9000)" action="delete" /> <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/samarth/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}" />
+        <location type="stats" path="/projects/ivory/clicksStats" />
+        <location type="meta" path="/projects/ivory/clicksMetaData" />
+    </locations>
+    
+    <ACL owner="testuser" group="group" permission="0x755" />
+    <schema location="/schema/clicks" provider="protobuf" />
+	<availabilityFlag>required_File.txt</availabilityFlag>
+    <properties>
+        <property name="field1" value="value1" />
+        <property name="field2" value="value2" />
+    </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template2.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template2.xml b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template2.xml
new file mode 100644
index 0000000..89ad45e
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/AvailabilityBundle/valid/bundle1/feed-template2.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="agregated-logs16" xmlns="uri:falcon:feed:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+<!--   <partitions>
+        <partition name="fraud" />
+        <partition name="good" />
+    </partitions>
+
+    <groups>online,bi</groups>-->
+
+        <frequency>hours(1)</frequency>
+<timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)" />
+
+    <clusters>
+        <cluster name="corp" type="source">
+            <validity start="2009-02-01T01:00Z" end="2099-05-01T00:00Z" />
+            <retention limit="hours(6)" action="delete" /> <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/examples/output-data/aggregator/aggregatedLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}" />
+        <location type="stats" path="/projects/ivory/clicksStats" />
+        <location type="meta" path="/projects/ivory/clicksMetaData" />
+    </locations>
+    
+    <ACL owner="testuser" group="group" permission="0x755" />
+    <schema location="/schema/clicks" provider="protobuf" />
+
+    <properties>
+        <property name="field1" value="value1" />
+        <property name="field2" value="value2" />
+    </properties>
+</feed>


Mime
View raw message