falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [13/27] adding falcon-regression
Date Mon, 04 Aug 2014 10:04:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
new file mode 100644
index 0000000..1d5fb1d
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hcat;
+
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.RetentionUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HCatUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hive.hcatalog.api.HCatPartition;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.oozie.client.OozieClient;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.annotations.DataProvider;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HCatRetentionTest extends BaseTestClass {
+
+    private static final Logger logger = Logger.getLogger(HCatRetentionTest.class);
+
+    private Bundle bundle;
+    public static HCatClient cli;
+    final String testDir = "/HCatRetentionTest/";
+    final String baseTestHDFSDir = baseHDFSDir + testDir;
+    final String dBName = "default";
+    final ColoHelper cluster = servers.get(0);
+    final FileSystem clusterFS = serverFS.get(0);
+    final OozieClient clusterOC = serverOC.get(0);
+    String tableName;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        HadoopUtil.recreateDir(clusterFS, baseTestHDFSDir);
+        cli = cluster.getClusterHelper().getHCatClient();
+        bundle = new Bundle(BundleUtil.readHCat2Bundle(), cluster);
+        bundle.generateUniqueBundle();
+        bundle.submitClusters(prism);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws HCatException {
+        bundle.deleteBundle(prism);
+        cli.dropTable(dBName, tableName, true);
+    }
+
+    @Test(enabled = true, dataProvider = "loopBelow", timeOut = 900000, groups = "embedded")
+    public void testHCatRetention(int retentionPeriod, RetentionUnit retentionUnit,
+                                  FeedType feedType) throws Exception {
+
+        /*the hcatalog table that is created changes tablename characters to lowercase. So the
+          name in the feed should be the same.*/
+        tableName = String.format("testhcatretention_%s_%d", retentionUnit.getValue(),
+            retentionPeriod);
+        createPartitionedTable(cli, dBName, tableName, baseTestHDFSDir, feedType);
+        FeedMerlin feedElement = new FeedMerlin(bundle.getInputFeedFromBundle());
+        feedElement.setTableValue(dBName, tableName, feedType.getHcatPathValue());
+        feedElement
+            .setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
+        if (retentionPeriod <= 0) {
+            AssertUtil.assertFailed(prism.getFeedHelper()
+                .submitEntity(URLS.SUBMIT_URL, bundle.getInputFeedFromBundle()));
+        } else {
+            final DateTime dataStartTime = new DateTime(
+                feedElement.getClusters().getClusters().get(0).getValidity().getStart(),
+                DateTimeZone.UTC).withSecondOfMinute(0);
+            final DateTime dataEndTime = new DateTime(
+                feedElement.getClusters().getClusters().get(0).getValidity().getEnd(),
+                DateTimeZone.UTC).withSecondOfMinute(0);
+            final List<DateTime> dataDates =
+                TimeUtil.getDatesOnEitherSide(dataStartTime, dataEndTime, feedType);
+            final List<String> dataDateStrings = TimeUtil.convertDatesToString(dataDates,
+                    feedType.getFormatter());
+            AssertUtil.checkForListSizes(dataDates, dataDateStrings);
+            final List<String> dataFolders = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
+                OSUtil.OOZIE_EXAMPLE_INPUT_LATE_INPUT, baseTestHDFSDir, dataDateStrings);
+            addPartitionsToExternalTable(cli, dBName, tableName, feedType, dataDates, dataFolders);
+            List<String> initialData =
+                getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir, feedType);
+            List<HCatPartition> initialPtnList = cli.getPartitions(dBName, tableName);
+            AssertUtil.checkForListSizes(initialData, initialPtnList);
+
+            AssertUtil.assertSucceeded(prism.getFeedHelper()
+                .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedElement.toString()));
+            final String bundleId = OozieUtil.getBundles(clusterOC, feedElement.getName(),
+                EntityType.FEED).get(0);
+            OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC);
+            AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(URLS.SUSPEND_URL,
+                feedElement.toString()));
+
+            List<String> expectedOutput = getExpectedOutput(retentionPeriod, retentionUnit,
+                feedType, new DateTime(DateTimeZone.UTC), initialData);
+            List<String> finalData = getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir,
+                feedType);
+            List<HCatPartition> finalPtnList = cli.getPartitions(dBName, tableName);
+
+            logger.info("checking expectedOutput and finalPtnList");
+            AssertUtil.checkForListSizes(expectedOutput, finalPtnList);
+            logger.info("checking expectedOutput and finalData");
+            AssertUtil.checkForListSizes(expectedOutput, finalData);
+            logger.info("finalData = " + finalData);
+            logger.info("expectedOutput = " + expectedOutput);
+            Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
+                    expectedOutput.toArray(new String[expectedOutput.size()])),
+                "expectedOutput and finalData don't match");
+        }
+    }
+
+    private static List<String> getHadoopDataFromDir(FileSystem fs, String hadoopPath,
+                                                     String dir, FeedType feedType)
+        throws IOException {
+        List<String> finalResult = new ArrayList<String>();
+        final int dirDepth = feedType.getDirDepth();
+
+        List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs,
+            new Path(hadoopPath), dirDepth);
+
+        for (Path result : results) {
+            int pathDepth = result.toString().split(dir)[1].split("/").length - 1;
+            if (pathDepth == dirDepth) {
+                finalResult.add(result.toString().split(dir)[1]);
+            }
+        }
+
+        return finalResult;
+    }
+
+    /**
+     * Get the expected output after retention is applied
+     *
+     * @param retentionPeriod retention period
+     * @param retentionUnit   retention unit
+     * @param feedType        feed type
+     * @param endDateUTC      end date of retention
+     * @param inputData       input data on which retention was applied
+     * @return expected output of the retention
+     */
+    private static List<String> getExpectedOutput(int retentionPeriod,
+                                                  RetentionUnit retentionUnit,
+                                                  FeedType feedType,
+                                                  DateTime endDateUTC,
+                                                  List<String> inputData) {
+        List<String> finalData = new ArrayList<String>();
+
+        //convert the end date to the same format
+        final String endLimit =
+            feedType.getFormatter().print(retentionUnit.minusTime(endDateUTC, retentionPeriod));
+        //now to actually check!
+        for (String testDate : inputData) {
+            if (testDate.compareTo(endLimit) >= 0) {
+                finalData.add(testDate);
+            }
+        }
+        return finalData;
+    }
+
+    private static void createPartitionedTable(HCatClient client, String dbName, String tableName,
+                                               String tableLoc, FeedType dataType)
+        throws HCatException {
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        ArrayList<HCatFieldSchema> ptnCols = new ArrayList<HCatFieldSchema>();
+
+        //client.dropDatabase("sample_db", true, HCatClient.DropDBMode.CASCADE);
+
+        cols.add(HCatUtil.getStringSchema("id", "id comment"));
+        cols.add(HCatUtil.getStringSchema("value", "value comment"));
+
+        switch (dataType) {
+            case MINUTELY:
+                ptnCols.add(
+                    HCatUtil.getStringSchema("minute", "min prt"));
+            case HOURLY:
+                ptnCols.add(
+                    HCatUtil.getStringSchema("hour", "hour prt"));
+            case DAILY:
+                ptnCols.add(HCatUtil.getStringSchema("day", "day prt"));
+            case MONTHLY:
+                ptnCols.add(
+                    HCatUtil.getStringSchema("month", "month prt"));
+            case YEARLY:
+                ptnCols.add(
+                    HCatUtil.getStringSchema("year", "year prt"));
+            default:
+                break;
+        }
+        HCatCreateTableDesc tableDesc = HCatCreateTableDesc
+            .create(dbName, tableName, cols)
+            .fileFormat("rcfile")
+            .ifNotExists(true)
+            .partCols(ptnCols)
+            .isTableExternal(true)
+            .location(tableLoc)
+            .build();
+        client.dropTable(dbName, tableName, true);
+        client.createTable(tableDesc);
+    }
+
+    private static void addPartitionsToExternalTable(HCatClient client, String dbName,
+                                                     String tableName, FeedType feedType,
+                                                     List<DateTime> dataDates,
+                                                     List<String> dataFolders)
+        throws HCatException {
+        //Adding specific partitions that map to an external location
+        Map<String, String> ptn = new HashMap<String, String>();
+        for (int i = 0; i < dataDates.size(); ++i) {
+            final String dataFolder = dataFolders.get(i);
+            final DateTime dataDate = dataDates.get(i);
+            switch (feedType) {
+                case MINUTELY:
+                    ptn.put("minute", "" + dataDate.getMinuteOfHour());
+                case HOURLY:
+                    ptn.put("hour", "" + dataDate.getHourOfDay());
+                case DAILY:
+                    ptn.put("day", "" + dataDate.getDayOfMonth());
+                case MONTHLY:
+                    ptn.put("month", "" + dataDate.getMonthOfYear());
+                case YEARLY:
+                    ptn.put("year", "" + dataDate.getYear());
+                    break;
+                default:
+                    Assert.fail("Unexpected feedType = " + feedType);
+            }
+            //Each HCat partition maps to a directory, not to a file
+            HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName,
+                tableName, dataFolder, ptn).build();
+            client.addPartition(addPtn);
+            ptn.clear();
+        }
+    }
+
+    @DataProvider(name = "loopBelow")
+    public Object[][] getTestData(Method m) {
+        RetentionUnit[] retentionUnits = new RetentionUnit[]{RetentionUnit.HOURS, RetentionUnit.DAYS,
+            RetentionUnit.MONTHS};// "minutes","years",
+        int[] periods = new int[]{7, 824, 43}; // a negative value like -4 should be covered
+        // in validation scenarios.
+        FeedType[] dataTypes =
+            new FeedType[]{
+                //disabling since falcon has support is for only for single hcat partition
+                //FeedType.DAILY, FeedType.MINUTELY, FeedType.HOURLY, FeedType.MONTHLY,
+                FeedType.YEARLY};
+        Object[][] testData = new Object[retentionUnits.length * periods.length * dataTypes.length][3];
+
+        int i = 0;
+
+        for (RetentionUnit retentionUnit : retentionUnits) {
+            for (int period : periods) {
+                for (FeedType dataType : dataTypes) {
+                    testData[i][0] = period;
+                    testData[i][1] = retentionUnit;
+                    testData[i][2] = dataType;
+                    i++;
+                }
+            }
+        }
+        return testData;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
new file mode 100644
index 0000000..c65526a
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.lineage;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.helpers.LineageHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.lineage.Direction;
+import org.apache.falcon.regression.core.response.lineage.Vertex;
+import org.apache.falcon.regression.core.response.lineage.VerticesResult;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.GraphAssert;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+@Test(groups = "lineage-rest")
+public class LineageApiProcessInstanceTest extends BaseTestClass {
+    private static final Logger logger = Logger.getLogger(LineageApiProcessInstanceTest.class);
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    LineageHelper lineageHelper;
+    String baseTestHDFSDir = baseHDFSDir + "/LineageApiInstanceTest";
+    String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    String feedInputPrefix = baseTestHDFSDir + "/input";
+    String feedInputPath = feedInputPrefix + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    String feedOutputPath =
+        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    String processName;
+    String inputFeedName;
+    String outputFeedName;
+    final String dataStartDate = "2010-01-02T09:00Z";
+    final String processStartDate = "2010-01-02T09:50Z";
+    final String endDate = "2010-01-02T10:00Z";
+
+
+    @BeforeClass(alwaysRun = true)
+    public void init() {
+        lineageHelper = new LineageHelper(prism);
+    }
+
+    @BeforeMethod(alwaysRun = true, firstTimeOnly = true)
+    public void setup(Method method) throws Exception {
+        HadoopUtil.deleteDirIfExists(baseTestHDFSDir, clusterFS);
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+        bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster);
+        bundles[0].generateUniqueBundle();
+
+        bundles[0].setInputFeedDataPath(feedInputPath);
+
+        // data set creation
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(dataStartDate, endDate, 5);
+        logger.info("dataDates = " + dataDates);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, feedInputPrefix,
+            dataDates);
+
+        // running process
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].setProcessValidity(processStartDate, endDate);
+        bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        processName = bundles[0].getProcessName();
+        inputFeedName = bundles[0].getInputFeedNameFromBundle();
+        outputFeedName = bundles[0].getOutputFeedNameFromBundle();
+        Job.Status status = null;
+        for (int i = 0; i < 20; i++) {
+            status = InstanceUtil.getDefaultCoordinatorStatus(cluster,
+                Util.getProcessName(bundles[0].getProcessData()), 0);
+            if (status == Job.Status.SUCCEEDED || status == Job.Status.KILLED)
+                break;
+            TimeUtil.sleepSeconds(30);
+        }
+        Assert.assertNotNull(status);
+        Assert.assertEquals(status, Job.Status.SUCCEEDED,
+            "The job did not succeeded even in long time");
+    }
+
+    @AfterMethod(alwaysRun = true, lastTimeOnly = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Test navigation from the process vertex to its instances vertices
+     * @throws Exception
+     */
+    @Test
+    public void processToProcessInstanceNodes() throws Exception {
+        final VerticesResult processResult = lineageHelper.getVerticesByName(processName);
+        GraphAssert.assertVertexSanity(processResult);
+        Vertex processVertex = processResult.getResults().get(0);
+        final VerticesResult processIncoming =
+            lineageHelper.getVerticesByDirection(processVertex.get_id(), Direction.inComingVertices);
+        GraphAssert.assertVertexSanity(processIncoming);
+        final List<Vertex> processInstanceVertices =
+            processIncoming.filterByType(Vertex.VERTEX_TYPE.PROCESS_INSTANCE);
+        logger.info("process instances = " + processInstanceVertices);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(processName, "?start=" + processStartDate +
+                "&end=" + endDate);
+        Assert.assertEquals(processInstanceVertices.size(), result.getInstances().length,
+            "Number of process instances should be same weather it is retrieved from lineage api " +
+                "or falcon rest api");
+    }
+
+    /**
+     * Test navigation from the process instance vertex to its input and output feed instances
+     * @throws Exception
+     */
+    @Test
+    public void processInstanceToFeedInstanceNodes() throws Exception {
+        final VerticesResult processResult = lineageHelper.getVerticesByName(processName);
+        GraphAssert.assertVertexSanity(processResult);
+        Vertex processVertex = processResult.getResults().get(0);
+        final VerticesResult processIncoming =
+            lineageHelper.getVerticesByDirection(processVertex.get_id(), Direction.inComingVertices);
+        GraphAssert.assertVertexSanity(processIncoming);
+        // fetching process instance vertex
+        final List<Vertex> piVertices =
+            processIncoming.filterByType(Vertex.VERTEX_TYPE.PROCESS_INSTANCE);
+        logger.info("process instance vertex = " + piVertices);
+
+        // fetching process instances info
+        InstancesResult piResult = prism.getProcessHelper()
+            .getProcessInstanceStatus(processName, "?start=" + processStartDate +
+                "&end=" + endDate);
+        Assert.assertEquals(piVertices.size(), piResult.getInstances().length,
+            "Number of process instances should be same weather it is retrieved from lineage api " +
+                "or falcon rest api");
+        final List<String> allowedPITimes = new ArrayList<String>();
+        for (InstancesResult.Instance processInstance : piResult.getInstances()) {
+            allowedPITimes.add(processInstance.getInstance());
+        }
+
+        for (Vertex piVertex : piVertices) {
+            Assert.assertTrue(piVertex.getName().startsWith(processName),
+                "Process instance names should start with process name: " + piVertex.getName());
+            String processInstanceTime = piVertex.getName().substring(processName.length() + 1);
+            logger.info("processInstanceTime = " + processInstanceTime);
+            Assert.assertTrue(allowedPITimes.remove(processInstanceTime),
+                "Unexpected processInstanceTime: " + processInstanceTime +
+                    "it should have been be in the list " + allowedPITimes);
+
+            VerticesResult piIncoming = lineageHelper.getVerticesByDirection(piVertex.get_id(),
+                Direction.inComingVertices);
+            GraphAssert.assertVertexSanity(piIncoming);
+            //process input start="now(0,-20) and end is "now(0,0)"
+            //frequency of the feed is 5 min so there are 5 instances
+            Assert.assertEquals(piIncoming.getTotalSize(), 5);
+            final List<String> allowedInpFeedInstDates =
+                TimeUtil.getMinuteDatesOnEitherSide(
+                    TimeUtil.oozieDateToDate(processInstanceTime).plusMinutes(-20),
+                    TimeUtil.oozieDateToDate(processInstanceTime), 5,
+                    OozieUtil.getOozieDateTimeFormatter());
+            // checking input feed instances
+            for(Vertex inFeedInst : piIncoming.filterByType(Vertex.VERTEX_TYPE.FEED_INSTANCE)) {
+                final String inFeedInstName = inFeedInst.getName();
+                Assert.assertTrue(inFeedInstName.startsWith(inputFeedName),
+                    "input feed instances should start with input feed name: " + inFeedInstName);
+                final String inFeedInstanceTime = inFeedInstName.substring(
+                    inputFeedName.length() + 1);
+                logger.info("inFeedInstanceTime = " + inFeedInstanceTime);
+                Assert.assertTrue(allowedInpFeedInstDates.remove(inFeedInstanceTime),
+                    "Unexpected inFeedInstanceTime: " + inFeedInstanceTime  + " it should have " +
+                        "been present in: " + allowedInpFeedInstDates);
+            }
+
+            VerticesResult piOutgoing = lineageHelper.getVerticesByDirection(
+                piVertex.get_id(), Direction.outgoingVertices);
+            GraphAssert.assertVertexSanity(piOutgoing);
+            Assert.assertEquals(piOutgoing.filterByType(Vertex.VERTEX_TYPE.FEED_INSTANCE).size(),
+                1, "Expected only one output feed instance.");
+            // checking output feed instances
+            final Vertex outFeedInst = piOutgoing.filterByType(Vertex.VERTEX_TYPE.FEED_INSTANCE).get(0);
+            final String outFeedInstName = outFeedInst.getName();
+            Assert.assertTrue(outFeedInstName.startsWith(outputFeedName),
+                "Expecting outFeedInstName: " + outFeedInstName +
+                    " to start with outputFeedName: " + outputFeedName);
+            final String outFeedInstanceTime = outFeedInstName.substring(outputFeedName.length() +
+                1);
+            Assert.assertEquals(outFeedInstanceTime, processInstanceTime,
+                "Expecting output feed instance time and process instance time to be same");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiTest.java
new file mode 100644
index 0000000..f7f95da
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiTest.java
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.lineage;
+
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.helpers.LineageHelper;
+import org.apache.falcon.regression.core.response.lineage.Direction;
+import org.apache.falcon.regression.core.response.lineage.Edge;
+import org.apache.falcon.regression.core.response.lineage.EdgesResult;
+import org.apache.falcon.regression.core.response.lineage.Vertex;
+import org.apache.falcon.regression.core.response.lineage.VertexIdsResult;
+import org.apache.falcon.regression.core.response.lineage.VertexResult;
+import org.apache.falcon.regression.core.response.lineage.VerticesResult;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.CleanupUtil;
+import org.apache.falcon.regression.core.util.Generator;
+import org.apache.falcon.regression.core.util.GraphAssert;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.http.HttpResponse;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+@Test(groups = "lineage-rest")
+public class LineageApiTest extends BaseTestClass {
+    private static final String datePattern = "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private static final Logger logger = Logger.getLogger(LineageApiTest.class);
+    private static final String testName = "LineageApiTest";
+    private static final String testTag =
+        Edge.LEBEL_TYPE.TESTNAME.toString().toLowerCase() + "=" + testName;
+    private static final String VERTEX_NOT_FOUND_REGEX = ".*Vertex.*%d.*not.*found.*\n?";
+    private static final String inValidArgumentStr = "Invalid argument";
+    LineageHelper lineageHelper;
+    final ColoHelper cluster = servers.get(0);
+    final String baseTestHDFSDir = baseHDFSDir + "/LineageApiTest";
+    final String feedInputPath = baseTestHDFSDir + "/input";
+    final String feedOutputPath = baseTestHDFSDir + "/output";
+    // use 5 <= x < 10 input feeds
+    final int numInputFeeds = 5 + new Random().nextInt(5);
+    // use 5 <= x < 10 output feeds
+    final int numOutputFeeds = 5 + new Random().nextInt(5);
+    ClusterMerlin clusterMerlin;
+    FeedMerlin[] inputFeeds;
+    FeedMerlin[] outputFeeds;
+
+    @BeforeClass(alwaysRun = true)
+    public void init() {
+        lineageHelper = new LineageHelper(prism);
+    }
+
+    @BeforeMethod(alwaysRun = true, firstTimeOnly = true)
+    public void setUp() throws Exception {
+        CleanupUtil.cleanAllEntities(prism);
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle.generateUniqueBundle();
+        bundles[0] = new Bundle(bundle, cluster);
+        final List<String> clusterStrings = bundles[0].getClusters();
+        Assert.assertEquals(clusterStrings.size(), 1, "Expecting only 1 clusterMerlin.");
+        clusterMerlin = new ClusterMerlin(clusterStrings.get(0));
+        clusterMerlin.setTags(testTag);
+        AssertUtil.assertSucceeded(
+            prism.getClusterHelper().submitEntity(Util.URLS.SUBMIT_URL, clusterMerlin.toString()));
+        logger.info("numInputFeeds = " + numInputFeeds);
+        logger.info("numOutputFeeds = " + numOutputFeeds);
+        final FeedMerlin inputMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        inputMerlin.setTags(testTag);
+        inputFeeds = generateFeeds(numInputFeeds, inputMerlin,
+            Generator.getNameGenerator("infeed", inputMerlin.getName()),
+            Generator.getHadoopPathGenerator(feedInputPath, datePattern));
+        for (FeedMerlin feed : inputFeeds) {
+            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL,
+                feed.toString()));
+        }
+
+        FeedMerlin outputMerlin = new FeedMerlin(bundles[0].getOutputFeedFromBundle());
+        outputMerlin.setTags(testTag);
+        outputFeeds = generateFeeds(numOutputFeeds, outputMerlin,
+            Generator.getNameGenerator("outfeed", outputMerlin.getName()),
+            Generator.getHadoopPathGenerator(feedOutputPath, datePattern));
+        for (FeedMerlin feed : outputFeeds) {
+            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL,
+                feed.toString()));
+        }
+    }
+
+    public static FeedMerlin[] generateFeeds(final int numInputFeeds,
+                                             final FeedMerlin originalFeedMerlin,
+                                             final Generator nameGenerator,
+                                             final Generator pathGenerator) {
+        FeedMerlin[] inputFeeds = new FeedMerlin[numInputFeeds];
+        //submit all input feeds
+        for(int count = 0; count < numInputFeeds; ++count) {
+            final FeedMerlin feed = new FeedMerlin(originalFeedMerlin.toString());
+            feed.setName(nameGenerator.generate());
+            feed.setLocation(LocationType.DATA, pathGenerator.generate());
+            inputFeeds[count] = feed;
+        }
+        return inputFeeds;
+    }
+
+    @AfterMethod(alwaysRun = true, lastTimeOnly = true)
+    public void tearDown() {
+        for (FeedMerlin inputFeed : inputFeeds) {
+            CleanupUtil.deleteQuietly(prism.getFeedHelper(), inputFeed.toString());
+        }
+        for (FeedMerlin outputFeed : outputFeeds) {
+            CleanupUtil.deleteQuietly(prism.getFeedHelper(), outputFeed.toString());
+        }
+        removeBundles();
+    }
+
+    /**
+     * Get all vertices from falcon and check that they are sane
+     * @throws Exception
+     */
+    @Test
+    public void testAllVertices() throws Exception {
+        final VerticesResult verticesResult = lineageHelper.getAllVertices();
+        logger.info(verticesResult);
+        GraphAssert.assertVertexSanity(verticesResult);
+        GraphAssert.assertUserVertexPresence(verticesResult);
+        GraphAssert.assertVerticesPresenceMinOccur(verticesResult, Vertex.VERTEX_TYPE.COLO, 1);
+        GraphAssert.assertVerticesPresenceMinOccur(verticesResult, Vertex.VERTEX_TYPE.TAGS, 1);
+        GraphAssert.assertVerticesPresenceMinOccur(verticesResult, Vertex.VERTEX_TYPE.CLUSTER_ENTITY, 1);
+        GraphAssert.assertVerticesPresenceMinOccur(verticesResult,
+            Vertex.VERTEX_TYPE.FEED_ENTITY, numInputFeeds + numOutputFeeds);
+    }
+
+    /**
+     * Get a vertex by id and check results
+     * @throws Exception
+     */
+    @Test
+    public void testVertexId() throws Exception {
+        final VerticesResult userResult =
+            lineageHelper.getVerticesByName(MerlinConstants.CURRENT_USER_NAME);
+        GraphAssert.assertVertexSanity(userResult);
+        final int vertexId = userResult.getResults().get(0).get_id();
+        final VertexResult userVertex =
+            lineageHelper.getVertexById(vertexId);
+        Assert.assertEquals(userResult.getResults().get(0), userVertex.getResults(),
+            "Same vertex should have been returned.");
+    }
+
+    /**
+     * Negative test - get a vertex without specifying id, we should not get internal server error
+     * @throws Exception
+     */
+    @Test
+    public void testVertexNoId() throws Exception {
+        HttpResponse response = lineageHelper.runGetRequest(
+            lineageHelper.getUrl(LineageHelper.URL.VERTICES, ""));
+        String responseString = lineageHelper.getResponseString(response);
+        logger.info("response: " + response);
+        logger.info("responseString: " + responseString);
+        Assert.assertNotEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_INTERNAL_SERVER_ERROR,
+            "We should not get internal server error");
+    }
+
+    /**
+     * Negative test - get a vertex specifying an invalid id, we should not get http non-found error
+     * @throws Exception
+     */
+    @Test
+    public void testVertexInvalidId() throws Exception {
+        final VerticesResult allVerticesResult =
+            lineageHelper.getAllVertices();
+        GraphAssert.assertVertexSanity(allVerticesResult);
+        int invalidVertexId = -1;
+        for (Vertex vertex : allVerticesResult.getResults()) {
+            if(invalidVertexId <= vertex.get_id()) {
+                invalidVertexId = vertex.get_id() + 1;
+            }
+        }
+
+        HttpResponse response = lineageHelper.runGetRequest(
+            lineageHelper.getUrl(LineageHelper.URL.VERTICES, "" + invalidVertexId));
+        String responseString = lineageHelper.getResponseString(response);
+        logger.info("response: " + response);
+        logger.info("responseString: " + responseString);
+        Assert.assertTrue(
+            responseString.matches(String.format(VERTEX_NOT_FOUND_REGEX, invalidVertexId)),
+            "Unexpected responseString: " + responseString);
+        Assert.assertEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_NOT_FOUND,
+            "We should get http not found error");
+    }
+
+    /**
+     * Get properties of one type of vertex and check those properties
+     * @param vertexType type of the vertex that we want to check
+     */
+    private void checkVertexOneProperty(Vertex.VERTEX_TYPE vertexType) {
+        final VerticesResult coloResult = lineageHelper.getVerticesByType(vertexType);
+        GraphAssert.assertVertexSanity(coloResult);
+        for (Vertex coloVertex : coloResult.getResults()) {
+            final int coloVertexId = coloVertex.get_id();
+            final VertexResult coloProperties = lineageHelper.getVertexProperties(coloVertexId);
+            Assert.assertNotNull(coloProperties.getResults().getName(),
+                "name should not be null");
+            Assert.assertEquals(coloProperties.getResults().getType(), vertexType);
+            Assert.assertNotNull(coloProperties.getResults().getTimestamp(),
+                "timestamp should not be null");
+        }
+    }
+
+    /**
+     * Test vertex properties for different types of vertices
+     * @throws Exception
+     */
+    @Test
+    public void testVertexProperties() throws Exception {
+        //testing properties of a user vertex
+        checkVertexOneProperty(Vertex.VERTEX_TYPE.USER);
+
+        //testing properties of colo vertices
+        checkVertexOneProperty(Vertex.VERTEX_TYPE.COLO);
+
+        //testing properties of group vertices
+        checkVertexOneProperty(Vertex.VERTEX_TYPE.GROUPS);
+
+        //testing properties of group vertices
+        //checkVertexOneProperty(Vertex.VERTEX_TYPE.FEED_ENTITY);
+    }
+
+    /**
+     * Test vertex properties supplying a blank id, expecting http not found error
+     * @throws Exception
+     */
+    @Test
+    public void testVertexPropertiesNoId() throws Exception {
+        //testing properties of a user vertex
+        HttpResponse response = lineageHelper.runGetRequest(lineageHelper
+            .getUrl(LineageHelper.URL.VERTICES_PROPERTIES, lineageHelper.getUrlPath("")));
+        String responseString = lineageHelper.getResponseString(response);
+        logger.info("response: " + response);
+        logger.info("responseString: " + responseString);
+        Assert.assertEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_NOT_FOUND, "We should get http not found error");
+    }
+
+    /**
+     * Test vertex properties supplying an invalid id, expecting http not found error
+     * @throws Exception
+     */
+    @Test
+    public void testVertexPropertiesInvalidId() throws Exception {
+        final VerticesResult allVerticesResult =
+            lineageHelper.getAllVertices();
+        GraphAssert.assertVertexSanity(allVerticesResult);
+
+        int invalidVertexId = -1;
+        for (Vertex vertex : allVerticesResult.getResults()) {
+            if(invalidVertexId <= vertex.get_id()) {
+                invalidVertexId = vertex.get_id() + 1;
+            }
+        }
+
+        HttpResponse response = lineageHelper.runGetRequest(
+            lineageHelper.getUrl(LineageHelper.URL.VERTICES_PROPERTIES, "" + invalidVertexId));
+        String responseString = lineageHelper.getResponseString(response);
+        logger.info("response: " + response);
+        logger.info("responseString: " + responseString);
+        Assert.assertTrue(
+            responseString.matches(String.format(VERTEX_NOT_FOUND_REGEX, invalidVertexId)),
+            "Unexpected responseString: " + responseString);
+        Assert.assertEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_NOT_FOUND,
+            "We should get http not found error");
+    }
+
+    /**
+     * Test filtering vertices by name
+     * @throws Exception
+     */
+    @Test
+    public void testVerticesFilterByName() throws Exception {
+        final String clusterName = clusterMerlin.getName();
+        final VerticesResult clusterVertices = lineageHelper.getVerticesByName(clusterName);
+        GraphAssert.assertVertexSanity(clusterVertices);
+        GraphAssert.assertVerticesPresenceMinOccur(clusterVertices,
+            Vertex.VERTEX_TYPE.CLUSTER_ENTITY, 1);
+        GraphAssert.assertVertexPresence(clusterVertices, clusterName);
+        for(int i = 0; i < numInputFeeds; ++i) {
+            final String feedName = inputFeeds[i].getName();
+            final VerticesResult feedVertices = lineageHelper.getVerticesByName(feedName);
+            GraphAssert.assertVertexSanity(feedVertices);
+            GraphAssert.assertVerticesPresenceMinOccur(feedVertices,
+                Vertex.VERTEX_TYPE.FEED_ENTITY, 1);
+            GraphAssert.assertVertexPresence(feedVertices, feedName);
+        }
+        for(int i = 0; i < numOutputFeeds; ++i) {
+            final String feedName = outputFeeds[i].getName();
+            final VerticesResult feedVertices = lineageHelper.getVerticesByName(feedName);
+            GraphAssert.assertVertexSanity(feedVertices);
+            GraphAssert.assertVerticesPresenceMinOccur(feedVertices,
+                Vertex.VERTEX_TYPE.FEED_ENTITY, 1);
+            GraphAssert.assertVertexPresence(feedVertices, feedName);
+        }
+
+    }
+
+    /**
+     * Test filtering vertices by type
+     * @throws Exception
+     */
+    @Test
+    public void testVerticesFilterByType() throws Exception {
+        final VerticesResult clusterVertices =
+            lineageHelper.getVerticesByType(Vertex.VERTEX_TYPE.CLUSTER_ENTITY);
+        GraphAssert.assertVertexSanity(clusterVertices);
+        GraphAssert.assertVerticesPresenceMinOccur(clusterVertices,
+            Vertex.VERTEX_TYPE.CLUSTER_ENTITY, 1);
+        GraphAssert.assertVertexPresence(clusterVertices, clusterMerlin.getName());
+        final VerticesResult feedVertices =
+            lineageHelper.getVerticesByType(Vertex.VERTEX_TYPE.FEED_ENTITY);
+        GraphAssert.assertVertexSanity(feedVertices);
+        GraphAssert.assertVerticesPresenceMinOccur(feedVertices,
+            Vertex.VERTEX_TYPE.FEED_ENTITY, 1);
+        for (FeedMerlin oneFeed : inputFeeds) {
+            GraphAssert.assertVertexPresence(feedVertices, oneFeed.getName());
+        }
+        for (FeedMerlin oneFeed : outputFeeds) {
+            GraphAssert.assertVertexPresence(feedVertices, oneFeed.getName());
+        }
+    }
+
+    /**
+     * Test filtering vertices when no output is produced
+     * @throws Exception
+     */
+    @Test
+    public void testVerticesFilterNoOutput() throws Exception {
+        final String nonExistingName = "this-is-a-non-existing-name";
+        final VerticesResult clusterVertices = lineageHelper.getVerticesByName(nonExistingName);
+        GraphAssert.assertVertexSanity(clusterVertices);
+        Assert.assertEquals(clusterVertices.getTotalSize(), 0,
+            "Result should not contain any vertex");
+    }
+
+    @Test
+    public void testVerticesFilterBlankValue() throws Exception {
+        Map<String, String> params = new TreeMap<String, String>();
+        params.put("key", Vertex.FilterKey.name.toString());
+        params.put("value", "");
+        HttpResponse response = lineageHelper
+            .runGetRequest(lineageHelper.getUrl(LineageHelper.URL.VERTICES, params));
+        String responseString = lineageHelper.getResponseString(response);
+        logger.info(responseString);
+        Assert.assertEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_BAD_REQUEST,
+            "The get request was a bad request");
+        Assert.assertTrue(responseString.contains(inValidArgumentStr),
+            "Result should contain string Invalid argument");
+    }
+
+    @Test
+    public void testVerticesFilterBlankKey() throws Exception {
+        Map<String, String> params = new TreeMap<String, String>();
+        params.put("key", "");
+        params.put("value", "someValue");
+        HttpResponse response = lineageHelper.runGetRequest(
+            lineageHelper.getUrl(LineageHelper.URL.VERTICES, params));
+        String responseString = lineageHelper.getResponseString(response);
+        logger.info(responseString);
+        Assert.assertEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_BAD_REQUEST,
+            "The get request was a bad request");
+        Assert.assertTrue(responseString.contains(inValidArgumentStr),
+            "Result should contain string Invalid argument");
+    }
+
+    @Test
+    public void testVertexDirectionFetchEdges() throws Exception {
+        final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
+
+        final EdgesResult bothEdges =
+            lineageHelper.getEdgesByDirection(clusterVertexId, Direction.bothEdges);
+        GraphAssert.assertEdgeSanity(bothEdges);
+        Assert.assertEquals(bothEdges.filterByType(Edge.LEBEL_TYPE.STORED_IN).size(),
+            inputFeeds.length + outputFeeds.length,
+            "There should be edge between the cluster and inputFeeds, outputFeeds");
+        Assert.assertEquals(bothEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).size(),
+            1, "There should be an edge from the cluster to colo");
+        Assert.assertEquals(bothEdges.getTotalSize(), inputFeeds.length + outputFeeds.length + 2,
+            "There should be edge from the cluster to inputFeeds & outputFeeds," +
+                " one between cluster and colo, one between cluster and classification");
+
+        final EdgesResult inComingEdges =
+            lineageHelper.getEdgesByDirection(clusterVertexId, Direction.inComingEdges);
+        GraphAssert.assertEdgeSanity(inComingEdges);
+        Assert.assertEquals(inComingEdges.getTotalSize(), inputFeeds.length + outputFeeds.length,
+            "There should be edge from the cluster to inputFeeds & outputFeeds");
+        Assert.assertEquals(inComingEdges.filterByType(Edge.LEBEL_TYPE.STORED_IN).size(),
+            inputFeeds.length + outputFeeds.length,
+            "There should be edge from the cluster to inputFeeds & outputFeeds");
+
+
+        final EdgesResult outGoingEdges =
+            lineageHelper.getEdgesByDirection(clusterVertexId, Direction.outGoingEdges);
+        GraphAssert.assertEdgeSanity(outGoingEdges);
+        Assert.assertEquals(outGoingEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).size(),
+            1, "There should be an edge from the cluster to colo");
+        Assert.assertEquals(outGoingEdges.filterByType(Edge.LEBEL_TYPE.TESTNAME).size(),
+            1, "There should be an edge from the cluster to classification");
+        Assert.assertEquals(outGoingEdges.getTotalSize(), 2,
+            "There should be an edge from the cluster to colo");
+    }
+
+    @Test
+    public void testVertexCountsFetchVertices() throws Exception {
+        final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
+
+        final VerticesResult bothVertices =
+            lineageHelper.getVerticesByDirection(clusterVertexId, Direction.bothVertices);
+        GraphAssert.assertVertexSanity(bothVertices);
+        Assert.assertEquals(bothVertices.filterByType(Vertex.VERTEX_TYPE.FEED_ENTITY).size(),
+            inputFeeds.length + outputFeeds.length,
+            "There should be edge from the cluster to inputFeeds & outputFeeds");
+        Assert.assertEquals(bothVertices.filterByType(Vertex.VERTEX_TYPE.COLO).size(), 1,
+            "The should be one edge between cluster and colo");
+        Assert.assertEquals(bothVertices.getTotalSize(),
+            inputFeeds.length + outputFeeds.length + 2,
+            "There should be edge from the cluster to inputFeeds & outputFeeds," +
+                " one between cluster and colo, one between cluster and classification");
+
+        final VerticesResult inComingVertices =
+            lineageHelper.getVerticesByDirection(clusterVertexId, Direction.inComingVertices);
+        GraphAssert.assertVertexSanity(inComingVertices);
+        Assert.assertEquals(inComingVertices.filterByType(Vertex.VERTEX_TYPE.FEED_ENTITY).size(),
+            inputFeeds.length + outputFeeds.length,
+            "There should be edge from the cluster to inputFeeds & outputFeeds");
+        Assert.assertEquals(inComingVertices.getTotalSize(),
+            inputFeeds.length + outputFeeds.length,
+            "There should be edge from the cluster to inputFeeds & outputFeeds and one " +
+                "between cluster and colo");
+
+        final VerticesResult outgoingVertices =
+            lineageHelper.getVerticesByDirection(clusterVertexId, Direction.outgoingVertices);
+        GraphAssert.assertVertexSanity(outgoingVertices);
+        Assert.assertEquals(outgoingVertices.filterByType(Vertex.VERTEX_TYPE.COLO).size(), 1,
+            "The should be one edge between cluster and colo");
+        Assert.assertEquals(outgoingVertices.filterByName(testName).size(),
+            1, "There should be an edge from the cluster to classification");
+        Assert.assertEquals(outgoingVertices.getTotalSize(), 2,
+            "There should be an edge from the cluster to colo");
+    }
+
+    @Test
+    public void testVertexDirectionFetchCounts() throws Exception {
+        final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
+
+        final VerticesResult bothCount =
+            lineageHelper.getVerticesByDirection(clusterVertexId, Direction.bothCount);
+        Assert.assertEquals(bothCount.getTotalSize(),
+            inputFeeds.length + outputFeeds.length + 2,
+            "There should be edge from the cluster to inputFeeds & outputFeeds," +
+                " one between cluster and colo, one between cluster and classification");
+
+        final VerticesResult inCount =
+            lineageHelper.getVerticesByDirection(clusterVertexId, Direction.inCount);
+        Assert.assertEquals(inCount.getTotalSize(),
+            inputFeeds.length + outputFeeds.length,
+            "There should be edge from the cluster to inputFeeds & outputFeeds and one " +
+                "between cluster and colo");
+
+        final VerticesResult outCount =
+            lineageHelper.getVerticesByDirection(clusterVertexId, Direction.outCount);
+        Assert.assertEquals(outCount.getTotalSize(), 2,
+            "There should be an edge from the cluster to colo");
+    }
+
+    @Test
+    public void testVertexDirectionFetchVertexIds() throws Exception {
+        final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
+
+        final VertexIdsResult bothVerticesIds =
+            lineageHelper.getVertexIdsByDirection(clusterVertexId, Direction.bothVerticesIds);
+        for (Integer vertexId : bothVerticesIds.getResults()) {
+            Assert.assertTrue(vertexId > 0, "Vertex id should be valid.");
+        }
+        Assert.assertEquals(bothVerticesIds.getTotalSize(),
+            inputFeeds.length + outputFeeds.length + 2,
+            "There should be edge from the cluster to inputFeeds & outputFeeds," +
+                " one between cluster and colo, one between cluster and classification");
+
+        final VertexIdsResult incomingVerticesIds =
+            lineageHelper.getVertexIdsByDirection(clusterVertexId, Direction.incomingVerticesIds);
+        for (Integer vertexId : incomingVerticesIds.getResults()) {
+            Assert.assertTrue(vertexId > 0, "Vertex id should be valid.");
+        }
+        Assert.assertEquals(incomingVerticesIds.getTotalSize(),
+            inputFeeds.length + outputFeeds.length,
+            "There should be edge from the cluster to inputFeeds & outputFeeds and one " +
+                "between cluster and colo");
+
+        final VertexIdsResult outgoingVerticesIds =
+            lineageHelper.getVertexIdsByDirection(clusterVertexId, Direction.outgoingVerticesIds);
+        for (Integer vertexId : outgoingVerticesIds.getResults()) {
+            Assert.assertTrue(vertexId > 0, "Vertex id should be valid.");
+        }
+        Assert.assertEquals(outgoingVerticesIds.getTotalSize(), 2,
+            "There should be an edge from the cluster to colo and one from cluster to " +
+                "classification");
+    }
+
+    @Test
+    public void testVertexBadDirection() throws Exception {
+        final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
+
+        HttpResponse response = lineageHelper
+            .runGetRequest(lineageHelper.getUrl(LineageHelper.URL.VERTICES,
+                lineageHelper.getUrlPath(clusterVertexId, "badDirection")));
+        final String responseString = lineageHelper.getResponseString(response);
+        logger.info("response: " + response);
+        logger.info("responseString: " + responseString);
+        Assert.assertEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_BAD_REQUEST,
+            "We should not get internal server error");
+    }
+
+    @Test
+    public void testAllEdges() throws Exception {
+        final EdgesResult edgesResult = lineageHelper.getAllEdges();
+        logger.info(edgesResult);
+        Assert.assertTrue(edgesResult.getTotalSize() > 0, "Total number of edges should be" +
+            " greater that zero but is: " + edgesResult.getTotalSize());
+        GraphAssert.assertEdgeSanity(edgesResult);
+        GraphAssert.assertEdgePresenceMinOccur(edgesResult, Edge.LEBEL_TYPE.CLUSTER_COLO, 1);
+        GraphAssert.assertEdgePresenceMinOccur(edgesResult, Edge.LEBEL_TYPE.STORED_IN,
+            numInputFeeds + numOutputFeeds);
+        GraphAssert.assertEdgePresenceMinOccur(edgesResult, Edge.LEBEL_TYPE.OWNED_BY,
+            1 + numInputFeeds + numOutputFeeds);
+    }
+
+    @Test
+    public void testEdge() throws Exception {
+        final int clusterVertexId = lineageHelper.getVertex(clusterMerlin.getName()).get_id();
+        final EdgesResult outGoingEdges =
+            lineageHelper.getEdgesByDirection(clusterVertexId, Direction.outGoingEdges);
+        GraphAssert.assertEdgeSanity(outGoingEdges);
+        Assert.assertEquals(outGoingEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).size(),
+            1, "There should be an edge from the cluster to colo");
+
+        final String clusterColoEdgeId =
+            outGoingEdges.filterByType(Edge.LEBEL_TYPE.CLUSTER_COLO).get(0).get_id();
+        final Edge clusterColoEdge =
+            lineageHelper.getEdgeById(clusterColoEdgeId).getResults();
+        GraphAssert.assertEdgeSanity(clusterColoEdge);
+    }
+
+    @Test
+    public void testEdgeBlankId() throws Exception {
+        final HttpResponse httpResponse = lineageHelper.runGetRequest(
+            lineageHelper.getUrl(LineageHelper.URL.EDGES, lineageHelper.getUrlPath("")));
+        logger.info(httpResponse.toString());
+        logger.info(lineageHelper.getResponseString(httpResponse));
+        Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(),
+            HttpStatus.SC_NOT_FOUND,
+            "Expecting not-found error.");
+    }
+
+    @Test
+    public void testEdgeInvalidId() throws Exception {
+        final HttpResponse response = lineageHelper.runGetRequest(
+            lineageHelper.getUrl(LineageHelper.URL.EDGES, lineageHelper.getUrlPath("invalid-id")));
+        logger.info(response.toString());
+        logger.info(lineageHelper.getResponseString(response));
+        Assert.assertEquals(response.getStatusLine().getStatusCode(),
+            HttpStatus.SC_NOT_FOUND,
+            "Expecting not-found error.");
+    }
+
+    @Test
+    public void testColoToClusterNode() throws Exception {
+        final VerticesResult verticesResult = lineageHelper.getVerticesByType(Vertex.VERTEX_TYPE.COLO);
+        GraphAssert.assertVertexSanity(verticesResult);
+        Assert.assertTrue(verticesResult.getTotalSize() > 0, "Expected at least 1 colo node");
+        Assert.assertTrue(verticesResult.getTotalSize() <= 3, "Expected at most 3 colo nodes");
+        final List<Vertex> colo1Vertex = verticesResult.filterByName(clusterMerlin.getColo());
+        AssertUtil.checkForListSize(colo1Vertex, 1);
+        Vertex coloVertex = colo1Vertex.get(0);
+        logger.info("coloVertex: " + coloVertex);
+        final VerticesResult verticesByDirection =
+            lineageHelper.getVerticesByDirection(coloVertex.get_id(), Direction.inComingVertices);
+        AssertUtil.checkForListSize(
+            verticesByDirection.filterByName(clusterMerlin.getName()), 1);
+    }
+
+    @Test
+    public void testClusterNodeToFeedNode() throws Exception {
+        final VerticesResult clusterResult = lineageHelper.getVerticesByName(
+            clusterMerlin.getName());
+        GraphAssert.assertVertexSanity(clusterResult);
+        Vertex clusterVertex = clusterResult.getResults().get(0);
+        final VerticesResult clusterIncoming =
+            lineageHelper.getVerticesByDirection(clusterVertex.get_id(), Direction.inComingVertices);
+        GraphAssert.assertVertexSanity(clusterIncoming);
+        for(FeedMerlin feed : inputFeeds) {
+            AssertUtil.checkForListSize(clusterIncoming.filterByName(feed.getName()), 1);
+        }
+        for(FeedMerlin feed : outputFeeds) {
+            AssertUtil.checkForListSize(clusterIncoming.filterByName(feed.getName()), 1);
+        }
+    }
+
+    @Test
+    public void testUserToEntityNode() throws Exception {
+        final VerticesResult userResult = lineageHelper.getVerticesByName(
+            MerlinConstants.CURRENT_USER_NAME);
+        GraphAssert.assertVertexSanity(userResult);
+        Vertex clusterVertex = userResult.getResults().get(0);
+        final VerticesResult userIncoming =
+            lineageHelper.getVerticesByDirection(clusterVertex.get_id(), Direction.inComingVertices);
+        GraphAssert.assertVertexSanity(userIncoming);
+        for(FeedMerlin feed : inputFeeds) {
+            AssertUtil.checkForListSize(userIncoming.filterByName(feed.getName()), 1);
+        }
+        for(FeedMerlin feed : outputFeeds) {
+            AssertUtil.checkForListSize(userIncoming.filterByName(feed.getName()), 1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
new file mode 100644
index 0000000..f0d5a50
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+@Test(groups = "distributed")
+public class FeedDelayParallelTimeoutTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+
+    String baseTestDir = baseHDFSDir + "/FeedDelayParallelTimeoutTest";
+    String feedInputPath = baseTestDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/";
+    String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private static final Logger logger = Logger.getLogger(FeedDelayParallelTimeoutTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundle, cluster1);
+        bundles[1] = new Bundle(bundle, cluster2);
+
+        bundles[0].generateUniqueBundle();
+        bundles[1].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[1].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(enabled = true, timeOut = 12000000)
+    public void timeoutTest() throws Exception {
+        bundles[0].setInputFeedDataPath(feedInputPath);
+
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        String feedOutput01 = bundles[0].getDataSets().get(0);
+        org.apache.falcon.entity.v0.Frequency delay =
+            new org.apache.falcon.entity.v0.Frequency(
+                "hours(5)");
+
+        feedOutput01 = InstanceUtil
+            .setFeedCluster(feedOutput01,
+                XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+                ClusterType.SOURCE, null);
+
+        // uncomment below 2 line when falcon in sync with ivory
+
+        //	feedOutput01 = instanceUtil.setFeedCluster(feedOutput01,
+        // XmlUtil.createValidity("2013-04-21T00:00Z",
+        // "2099-10-01T12:10Z"),XmlUtil.createRtention("hours(15)",ActionType.DELETE),
+        // Util.readClusterName(bundles[1].getClusters().get(0)),ClusterType.SOURCE,"",delay,
+        // feedInputPath);
+        //	feedOutput01 = instanceUtil.setFeedCluster(feedOutput01,
+        // XmlUtil.createValidity("2013-04-21T00:00Z",
+        // "2099-10-01T12:25Z"),XmlUtil.createRtention("hours(15)",ActionType.DELETE),
+        // Util.readClusterName(bundles[0].getClusters().get(0)),ClusterType.TARGET,"",delay,
+        // feedOutputPath);
+
+        //feedOutput01 = instanceUtil.setFeedCluster(feedOutput01,
+        // XmlUtil.createValidity("2013-04-21T00:00Z",
+        // "2099-10-01T12:10Z"),XmlUtil.createRtention("hours(15)",ActionType.DELETE),
+        // Util.readClusterName(bundles[1].getClusters().get(0)),ClusterType.SOURCE,"",
+        // feedInputPath);
+        //feedOutput01 = instanceUtil.setFeedCluster(feedOutput01,
+        // XmlUtil.createValidity("2013-04-21T00:00Z",
+        // "2099-10-01T12:25Z"),XmlUtil.createRtention("hours(15)",ActionType.DELETE),
+        // Util.readClusterName(bundles[0].getClusters().get(0)),ClusterType.TARGET,"",
+        // feedOutputPath);
+
+        feedOutput01 = Util.setFeedProperty(feedOutput01, "timeout", "minutes(35)");
+        feedOutput01 = Util.setFeedProperty(feedOutput01, "parallel", "3");
+
+        logger.info("feedOutput01: " + Util.prettyPrintXml(feedOutput01));
+        prism.getFeedHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedOutput01);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
new file mode 100644
index 0000000..e0571b5
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+
+
+@Test(groups = "embedded")
+public class FeedRetentionTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    FileSystem cluster1FS = serverFS.get(0);
+    FileSystem cluster2FS = serverFS.get(1);
+    String impressionrcWorkflowDir = baseHDFSDir + "/FeedRetentionTest/impressionrc/";
+    String impressionrcWorkflowLibPath = impressionrcWorkflowDir + "lib";
+    private static final Logger logger = Logger.getLogger(FeedRetentionTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        for (FileSystem fs : serverFS) {
+            fs.copyFromLocalFile(new Path(
+                OSUtil.getPath(OSUtil.RESOURCES, "workflows", "impression_rc_workflow.xml")),
+                new Path(impressionrcWorkflowDir + "workflow.xml"));
+            HadoopUtil.uploadDir(fs, impressionrcWorkflowLibPath, OSUtil.RESOURCES_OOZIE + "lib");
+        }
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        //getImpressionRC bundle
+        bundles[0] = BundleUtil.readImpressionRCBundle();
+        bundles[0].generateUniqueBundle();
+        bundles[0] = new Bundle(bundles[0], cluster1);
+        bundles[0].setProcessWorkflow(impressionrcWorkflowDir);
+
+        bundles[1] = BundleUtil.readImpressionRCBundle();
+        bundles[1].generateUniqueBundle();
+        bundles[1] = new Bundle(bundles[1], cluster2);
+        bundles[1].setProcessWorkflow(impressionrcWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * submit 2 clusters
+     * submit and schedule feed on above 2 clusters, both having different locations
+     * submit and schedule process having the above feed as output feed and running on 2
+     * clusters
+     */
+    @Test(enabled = true)
+    public void testRetentionClickRC_2Colo() throws Exception {
+        String inputPath = baseHDFSDir + "/testInput/";
+        String inputData = inputPath + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+        String outputPathTemplate = baseHDFSDir +
+            "/testOutput/op%d/ivoryRetention0%d/%s/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+            TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(10), 1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.RESOURCES + "thriftRRMar0602.gz",
+            inputPath, dataDates);
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.RESOURCES + "thriftRRMar0602.gz",
+            inputPath, dataDates);
+
+        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+
+        String feedOutput01 = bundles[0].getFeed("FETL-RequestRC");
+
+        feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
+            XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}",
+            String.format(outputPathTemplate, 1, 1, "data"),
+            String.format(outputPathTemplate, 1, 1, "stats"),
+            String.format(outputPathTemplate, 1, 1, "meta"),
+            String.format(outputPathTemplate, 1, 1, "tmp"));
+
+        feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
+            XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}",
+            String.format(outputPathTemplate, 1, 2, "data"),
+            String.format(outputPathTemplate, 1, 2, "stats"),
+            String.format(outputPathTemplate, 1, 2, "meta"),
+            String.format(outputPathTemplate, 1, 2, "tmp"));
+
+        //submit the new output feed
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOutput01));
+
+        String feedOutput02 = bundles[0].getFeed("FETL-ImpressionRC");
+        feedOutput02 = InstanceUtil.setFeedCluster(feedOutput02,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feedOutput02 = InstanceUtil.setFeedCluster(feedOutput02,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
+            XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}",
+            String.format(outputPathTemplate, 2, 1, "data"),
+            String.format(outputPathTemplate, 2, 1, "stats"),
+            String.format(outputPathTemplate, 2, 1, "meta"),
+            String.format(outputPathTemplate, 2, 1, "tmp"));
+
+        feedOutput02 = InstanceUtil.setFeedCluster(feedOutput02,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
+            XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}",
+            String.format(outputPathTemplate, 2, 2, "data"),
+            String.format(outputPathTemplate, 2, 2, "stats"),
+            String.format(outputPathTemplate, 2, 2, "meta"),
+            String.format(outputPathTemplate, 2, 2, "tmp"));
+
+        //submit the new output feed
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOutput02));
+
+        String feedInput = bundles[0].getFeed("FETL2-RRLog");
+        feedInput = InstanceUtil
+            .setFeedCluster(feedInput,
+                XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+                ClusterType.SOURCE, null);
+
+        feedInput = InstanceUtil.setFeedCluster(feedInput,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
+            XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}", inputData);
+
+        feedInput = InstanceUtil.setFeedCluster(feedInput,
+            XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
+            XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "${cluster.colo}", inputData);
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedInput));
+
+        String process = bundles[0].getProcessData();
+        process = InstanceUtil.setProcessCluster(process, null,
+            XmlUtil.createProcessValidity("2012-10-01T12:00Z", "2012-10-01T12:10Z"));
+
+        process = InstanceUtil.setProcessCluster(process,
+            Util.readEntityName(bundles[0].getClusters().get(0)),
+            XmlUtil.createProcessValidity(TimeUtil.getTimeWrtSystemTime(-2),
+                TimeUtil.getTimeWrtSystemTime(5)));
+        process = InstanceUtil.setProcessCluster(process,
+            Util.readEntityName(bundles[1].getClusters().get(0)),
+            XmlUtil.createProcessValidity(TimeUtil.getTimeWrtSystemTime(-2),
+                TimeUtil.getTimeWrtSystemTime(5)));
+
+        logger.info("process: " + Util.prettyPrintXml(process));
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process));
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOutput01));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOutput02));
+    }
+
+}


Mime
View raw message