falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [32/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom
Date Tue, 01 Mar 2016 08:26:18 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
deleted file mode 100644
index 450b251..0000000
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ /dev/null
@@ -1,1080 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.parser.EntityParserFactory;
-import org.apache.falcon.entity.parser.FeedEntityParser;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Properties;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.Argument;
-import org.apache.falcon.entity.v0.feed.Arguments;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Clusters;
-import org.apache.falcon.entity.v0.feed.Extract;
-import org.apache.falcon.entity.v0.feed.ExtractMethod;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
-import org.apache.falcon.entity.v0.feed.FieldsType;
-import org.apache.falcon.entity.v0.feed.Import;
-import org.apache.falcon.entity.v0.feed.Lifecycle;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.feed.MergeType;
-import org.apache.falcon.entity.v0.feed.RetentionStage;
-import org.apache.falcon.entity.v0.feed.Datasource;
-import org.apache.falcon.entity.v0.feed.Validity;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.service.LifecyclePolicyMap;
-import org.apache.falcon.util.DateUtil;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TimeZone;
-
-/**
- * Test for feed helper methods.
- */
-public class FeedHelperTest extends AbstractTestBase {
-    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-    private ConfigurationStore store;
-
-    @BeforeClass
-    public void init() throws Exception {
-        initConfigStore();
-        LifecyclePolicyMap.get().init();
-    }
-
-    @BeforeMethod
-    public void setUp() throws Exception {
-        cleanupStore();
-        store = getStore();
-    }
-
-    @Test
-    public void testPartitionExpression() {
-        Assert.assertEquals(FeedHelper.normalizePartitionExpression(" /a// ", "  /b// "), "a/b");
-        Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, "  /b// "), "b");
-        Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, null), "");
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testInstanceBeforeStart() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-        FeedHelper.getProducerInstance(feed, getDate("2011-02-27 10:00 UTC"), cluster);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testInstanceEqualsEnd() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-        FeedHelper.getProducerInstance(feed, getDate("2016-02-28 10:00 UTC"), cluster);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testInstanceOutOfSync() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-        FeedHelper.getProducerInstance(feed, getDate("2016-02-28 09:04 UTC"), cluster);
-    }
-
-    @Test
-    public void testInvalidProducerInstance() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-        Assert.assertNull(FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:40 UTC"), cluster));
-    }
-
-    @Test
-    public void testGetProducerOutOfValidity() throws FalconException, ParseException {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
-        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"),
-                cluster);
-        Assert.assertNull(result);
-    }
-
-    @Test
-    public void testGetConsumersOutOfValidity() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(0, -20)");
-        inFeed.setEnd("now(0, 0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"),
-                cluster);
-        Assert.assertTrue(result.isEmpty());
-    }
-
-    @Test
-    public void testGetFeedValidityStartAndNextInstance() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Date date = FeedHelper.getFeedValidityStart(feed, cluster.getName());
-        Assert.assertEquals(DateUtil.getDateFormatFromTime(date.getTime()), "2011-02-28T10:00Z");
-        Date nextDate = FeedHelper.getNextFeedInstanceDate(date, feed);
-        Assert.assertEquals(DateUtil.getDateFormatFromTime(nextDate.getTime()), "2011-02-28T10:05Z");
-    }
-
-
-    @Test
-    public void testGetConsumersFirstInstance() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(0, -20)");
-        inFeed.setEnd("now(0, 0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"),
-                cluster);
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2012-02-28 10:37 UTC"), EntityType.PROCESS);
-        consumer.setTags(SchedulableEntityInstance.INPUT);
-        expected.add(consumer);
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetConsumersLastInstance() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:20 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(0, -20)");
-        inFeed.setEnd("now(0, 0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"),
-                cluster);
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        String[] consumers = { "2012-02-28 10:20 UTC", "2012-02-28 10:30 UTC", };
-        for (String d : consumers) {
-            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                    getDate(d), EntityType.PROCESS);
-            i.setTags(SchedulableEntityInstance.INPUT);
-            expected.add(i);
-        }
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetPolicies() throws Exception {
-        FeedEntityParser parser = (FeedEntityParser) EntityParserFactory
-                .getParser(EntityType.FEED);
-        Feed feed = parser.parse(this.getClass().getResourceAsStream(FEED3_XML));
-        List<String> policies = FeedHelper.getPolicies(feed, "testCluster");
-        Assert.assertEquals(policies.size(), 1);
-        Assert.assertEquals(policies.get(0), "AgeBasedDelete");
-    }
-
-    @Test
-    public void testFeedWithNoDependencies() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"),
-                cluster);
-        Assert.assertTrue(result.isEmpty());
-        SchedulableEntityInstance res = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"),
-                cluster);
-        Assert.assertNull(res);
-    }
-
-    @Test
-    public void testEvaluateExpression() throws Exception {
-        Cluster cluster = new Cluster();
-        cluster.setName("name");
-        cluster.setColo("colo");
-        cluster.setProperties(new Properties());
-        Property prop = new Property();
-        prop.setName("pname");
-        prop.setValue("pvalue");
-        cluster.getProperties().getProperties().add(prop);
-
-        Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "${cluster.colo}/*/US"), "colo/*/US");
-        Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "${cluster.name}/*/${cluster.pname}"),
-                "name/*/pvalue");
-        Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "IN"), "IN");
-    }
-
-    @DataProvider(name = "fsPathsforDate")
-    public Object[][] createPathsForGetDate() {
-        final TimeZone utc = TimeZone.getTimeZone("UTC");
-        final TimeZone pacificTime = TimeZone.getTimeZone("America/Los_Angeles");
-        final TimeZone ist = TimeZone.getTimeZone("IST");
-
-        return new Object[][] {
-            {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015/01/01/00/30", utc, "2015-01-01T00:30Z"},
-            {"/data/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}", "/data/2015-01-01-01-00", utc, "2015-01-01T01:00Z"},
-            {"/data/${YEAR}/${MONTH}/${DAY}", "/data/2015/01/01", utc, "2015-01-01T00:00Z"},
-            {"/data/${YEAR}/${MONTH}/${DAY}/data", "/data/2015/01/01/data", utc, "2015-01-01T00:00Z"},
-            {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015-01-01/00/30", utc, null},
-            {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data", "/data/2015-01-01/00/30", utc, null},
-            {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/data", "/d/2015/05/25/00/data/{p1}/p2", utc, "2015-05-25T00:00Z"},
-            {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data", "/data/2015/05/25/00/00/{p1}/p2", utc, null},
-            {"/d/${YEAR}/${MONTH}/M", "/d/2015/11/M", utc, "2015-11-01T00:00Z"},
-            {"/d/${YEAR}/${MONTH}/${DAY}/M", "/d/2015/11/02/M", utc, "2015-11-02T00:00Z"},
-            {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/M", "/d/2015/11/01/04/M", utc, "2015-11-01T04:00Z"},
-            {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/M", "/d/2015/11/01/04/15/M", utc, "2015-11-01T04:15Z"},
-            {"/d/${YEAR}/${MONTH}/M", "/d/2015/11/M", pacificTime, "2015-11-01T07:00Z"},
-            {"/d/${YEAR}/${MONTH}/${DAY}/M", "/d/2015/11/02/M", pacificTime, "2015-11-02T08:00Z"},
-            {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/M", "/d/2015/11/01/04/M", pacificTime, "2015-11-01T12:00Z"},
-            {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/M", "/d/2015/11/01/04/15/M", ist, "2015-10-31T22:45Z"},
-        };
-    }
-
-    @Test(dataProvider = "fsPathsforDate")
-    public void testGetDateFromPath(String template, String path, TimeZone tz, String expectedDate) throws Exception {
-        Date date = FeedHelper.getDate(template, new Path(path), tz);
-        Assert.assertEquals(SchemaHelper.formatDateUTC(date), expectedDate);
-    }
-
-    @Test
-    public void testGetLocations() {
-        Cluster cluster = new Cluster();
-        cluster.setName("name");
-        Feed feed = new Feed();
-        Location location1 = new Location();
-        location1.setType(LocationType.META);
-        Locations locations = new Locations();
-        locations.getLocations().add(location1);
-
-        Location location2 = new Location();
-        location2.setType(LocationType.DATA);
-        locations.getLocations().add(location2);
-
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = new org.apache.falcon.entity.v0.feed.Cluster();
-        feedCluster.setName("name");
-
-        feed.setLocations(locations);
-        Clusters clusters = new Clusters();
-        feed.setClusters(clusters);
-        feed.getClusters().getClusters().add(feedCluster);
-
-        Assert.assertEquals(FeedHelper.getLocations(feedCluster, feed),
-                locations.getLocations());
-        Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2);
-    }
-
-    @Test
-    public void testGetProducerProcessWithOffset() throws FalconException, ParseException {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Assert.assertNull(FeedHelper.getProducerProcess(feed));
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2016-02-28 10:37 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
-        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:35 UTC"),
-                cluster);
-        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2013-02-28 10:37 UTC"), EntityType.PROCESS);
-        expected.setTags(SchedulableEntityInstance.OUTPUT);
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetProducerProcessForNow() throws FalconException, ParseException {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Assert.assertNull(FeedHelper.getProducerProcess(feed));
-
-        // create it's producer process submit it, test it's ProducerProcess
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
-        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"),
-                cluster);
-        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTags(SchedulableEntityInstance.OUTPUT);
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetProducerWithNowNegativeOffset() throws FalconException, ParseException {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Assert.assertNull(FeedHelper.getProducerProcess(feed));
-
-        // create it's producer process submit it, test it's ProducerProcess
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(-4,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
-        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"),
-                cluster);
-        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTags(SchedulableEntityInstance.OUTPUT);
-        Assert.assertEquals(result, expected);
-    }
-
-
-    @Test
-    public void testGetProducerWithNowPositiveOffset() throws FalconException, ParseException {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Assert.assertNull(FeedHelper.getProducerProcess(feed));
-
-        // create it's producer process submit it, test it's ProducerProcess
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("now(4,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
-        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"),
-                cluster);
-        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTags(SchedulableEntityInstance.OUTPUT);
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetProducerProcessInstance() throws FalconException, ParseException {
-        //create a feed, submit it, test that ProducerProcess is null
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 00:00 UTC", "2016-02-28 10:00 UTC");
-
-        // create it's producer process submit it, test it's ProducerProcess
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Outputs outputs = new Outputs();
-        Output outFeed = new Output();
-        outFeed.setName("outputFeed");
-        outFeed.setFeed(feed.getName());
-        outFeed.setInstance("today(0,0)");
-        outputs.getOutputs().add(outFeed);
-        process.setOutputs(outputs);
-        store.publish(EntityType.PROCESS, process);
-        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
-        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 00:00 UTC"),
-                cluster);
-        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTags(SchedulableEntityInstance.OUTPUT);
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetConsumerProcesses() throws FalconException, ParseException {
-        //create a feed, submit it, test that ConsumerProcesses is blank list
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-
-        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("outputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("today(0,0)");
-        inFeed.setEnd("today(0,0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<Process> result = FeedHelper.getConsumerProcesses(feed);
-        Assert.assertEquals(result.size(), 1);
-        Assert.assertTrue(result.contains(process));
-    }
-
-    @Test
-    public void testGetConsumerProcessInstances() throws Exception {
-        //create a feed, submit it, test that ConsumerProcesses is blank list
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
-
-        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(-4, 30)");
-        inFeed.setEnd("now(4, 30)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2012-02-28 09:00 UTC"), cluster);
-        Assert.assertEquals(result.size(), 1);
-
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        SchedulableEntityInstance ins = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2012-02-28 10:00 UTC"), EntityType.PROCESS);
-        ins.setTags(SchedulableEntityInstance.INPUT);
-        expected.add(ins);
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetConsumerProcessInstancesWithNonUnitFrequency() throws Exception {
-        //create a feed, submit it, test that ConsumerProcesses is blank list
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
-
-        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(0, -20)");
-        inFeed.setEnd("now(0,0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2012-02-28 09:40 UTC"), cluster);
-
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        String[] consumers = {"2012-02-28 09:47 UTC", "2012-02-28 09:57 UTC"};
-        for (String d : consumers) {
-            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                    getDate(d), EntityType.PROCESS);
-            i.setTags(SchedulableEntityInstance.INPUT);
-            expected.add(i);
-        }
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetConsumersOutOfValidityRange() throws Exception {
-        //create a feed, submit it, test that ConsumerProcesses is blank list
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
-
-        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(0, -20)");
-        inFeed.setEnd("now(0,0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2010-02-28 09:40 UTC"), cluster);
-        Assert.assertEquals(result.size(), 0);
-    }
-
-    @Test
-    public void testGetConsumersLargeOffsetShortValidity() throws Exception {
-        //create a feed, submit it, test that ConsumerProcesses is blank list
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
-
-        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2012-02-28 09:47 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("today(-2, 0)");
-        inFeed.setEnd("now(0,0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2012-02-28 09:35 UTC"), cluster);
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2012-02-28 09:37 UTC"), EntityType.PROCESS);
-        consumer.setTags(SchedulableEntityInstance.INPUT);
-        expected.add(consumer);
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetMultipleConsumerInstances() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
-        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(-4, 30)");
-        inFeed.setEnd("now(4, 30)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2012-02-28 09:00 UTC"), cluster);
-        Assert.assertEquals(result.size(), 9);
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        String[] consumers = { "2012-02-28 05:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 07:00 UTC",
-            "2012-02-28 08:00 UTC", "2012-02-28 09:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 11:00 UTC",
-            "2012-02-28 12:00 UTC", "2012-02-28 13:00 UTC", };
-        for (String d : consumers) {
-            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                    getDate(d), EntityType.PROCESS);
-            i.setTags(SchedulableEntityInstance.INPUT);
-            expected.add(i);
-        }
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetConsumerWithVariableEnd() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
-
-        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("today(0, 0)");
-        inFeed.setEnd("now(0, 0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2012-02-28 00:00 UTC"), cluster);
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        String[] consumers =  {"2012-02-28 11:00 UTC", "2012-02-28 16:00 UTC", "2012-02-28 18:00 UTC",
-            "2012-02-28 20:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 04:00 UTC",
-            "2012-02-28 06:00 UTC", "2012-02-28 05:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 00:00 UTC",
-            "2012-02-28 23:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 22:00 UTC",
-            "2012-02-28 14:00 UTC", "2012-02-28 08:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 02:00 UTC",
-            "2012-02-28 01:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 09:00 UTC",
-            "2012-02-28 07:00 UTC", };
-        for (String d : consumers) {
-            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                    getDate(d), EntityType.PROCESS);
-            i.setTags(SchedulableEntityInstance.INPUT);
-            expected.add(i);
-        }
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetConsumerWithVariableStart() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
-
-        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("now(0, 0)");
-        inFeed.setEnd("today(24, 0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2012-03-28 00:00 UTC"), cluster);
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        String[] consumers =  {"2012-03-27 16:00 UTC", "2012-03-27 01:00 UTC", "2012-03-27 10:00 UTC",
-            "2012-03-27 03:00 UTC", "2012-03-27 08:00 UTC", "2012-03-27 07:00 UTC", "2012-03-27 19:00 UTC",
-            "2012-03-27 22:00 UTC", "2012-03-27 12:00 UTC", "2012-03-27 20:00 UTC", "2012-03-27 09:00 UTC",
-            "2012-03-27 04:00 UTC", "2012-03-27 14:00 UTC", "2012-03-27 05:00 UTC", "2012-03-27 23:00 UTC",
-            "2012-03-27 17:00 UTC", "2012-03-27 13:00 UTC", "2012-03-27 18:00 UTC", "2012-03-27 15:00 UTC",
-            "2012-03-28 00:00 UTC", "2012-03-27 02:00 UTC", "2012-03-27 11:00 UTC", "2012-03-27 21:00 UTC",
-            "2012-03-27 00:00 UTC", "2012-03-27 06:00 UTC", };
-        for (String d : consumers) {
-            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                    getDate(d), EntityType.PROCESS);
-            i.setTags(SchedulableEntityInstance.INPUT);
-            expected.add(i);
-        }
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testGetConsumerWithLatest() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
-        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
-        Inputs inputs = new Inputs();
-        Input inFeed = new Input();
-        inFeed.setName("inputFeed");
-        inFeed.setFeed(feed.getName());
-        inFeed.setStart("today(0, 0)");
-        inFeed.setEnd("latest(0)");
-        inputs.getInputs().add(inFeed);
-        process.setInputs(inputs);
-        store.publish(EntityType.PROCESS, process);
-
-        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
-                getDate("2012-02-28 00:00 UTC"), cluster);
-        Set<SchedulableEntityInstance> expected = new HashSet<>();
-        String[] consumers =  {"2012-02-28 23:00 UTC", "2012-02-28 04:00 UTC", "2012-02-28 10:00 UTC",
-            "2012-02-28 07:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 05:00 UTC",
-            "2012-02-28 22:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 11:00 UTC",
-            "2012-02-28 20:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 01:00 UTC", "2012-02-28 14:00 UTC",
-            "2012-02-28 00:00 UTC", "2012-02-28 18:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 16:00 UTC",
-            "2012-02-28 09:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 08:00 UTC",
-            "2012-02-28 02:00 UTC", };
-        for (String d : consumers) {
-            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                    getDate(d), EntityType.PROCESS);
-            i.setTags(SchedulableEntityInstance.INPUT);
-            expected.add(i);
-        }
-        Assert.assertEquals(result, expected);
-    }
-
-    @Test
-    public void testIsLifeCycleEnabled() throws Exception {
-        Feed feed = new Feed();
-
-        // lifecycle is not defined
-        Clusters clusters = new Clusters();
-        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
-        cluster.setName("cluster1");
-        clusters.getClusters().add(cluster);
-        feed.setClusters(clusters);
-        Assert.assertFalse(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
-
-        // lifecycle is defined at global level
-        Lifecycle globalLifecycle = new Lifecycle();
-        RetentionStage retentionStage = new RetentionStage();
-        retentionStage.setFrequency(new Frequency("hours(2)"));
-        globalLifecycle.setRetentionStage(retentionStage);
-        feed.setLifecycle(globalLifecycle);
-        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
-
-        // lifecycle is defined at both global and cluster level
-        Lifecycle clusterLifecycle = new Lifecycle();
-        retentionStage = new RetentionStage();
-        retentionStage.setFrequency(new Frequency("hours(4)"));
-        clusterLifecycle.setRetentionStage(retentionStage);
-        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
-        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
-
-        // lifecycle is defined only at cluster level
-        feed.setLifecycle(null);
-        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
-    }
-
-    @Test
-    public void testGetRetentionStage() throws Exception {
-        Feed feed = new Feed();
-        feed.setFrequency(new Frequency("days(1)"));
-
-        // retention stage frequency is not defined
-        Lifecycle globalLifecycle = new Lifecycle();
-        RetentionStage globalRetentionStage = new RetentionStage();
-        globalLifecycle.setRetentionStage(globalRetentionStage);
-        feed.setLifecycle(globalLifecycle);
-
-        Clusters clusters = new Clusters();
-        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
-        cluster.setName("cluster1");
-        clusters.getClusters().add(cluster);
-        feed.setClusters(clusters);
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("days(1)"));
-
-        // lifecycle is defined only at global level
-        globalRetentionStage.setFrequency(new Frequency("hours(2)"));
-        globalLifecycle.setRetentionStage(globalRetentionStage);
-        feed.setLifecycle(globalLifecycle);
-        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                feed.getLifecycle().getRetentionStage().getFrequency());
-
-        // lifecycle is defined at both global and cluster level
-        Lifecycle clusterLifecycle = new Lifecycle();
-        RetentionStage clusterRetentionStage = new RetentionStage();
-        clusterRetentionStage.setFrequency(new Frequency("hours(4)"));
-        clusterLifecycle.setRetentionStage(clusterRetentionStage);
-        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
-        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                cluster.getLifecycle().getRetentionStage().getFrequency());
-
-        // lifecycle at both level - retention only at cluster level.
-        feed.getLifecycle().setRetentionStage(null);
-        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                cluster.getLifecycle().getRetentionStage().getFrequency());
-
-        // lifecycle at both level - retention only at global level.
-        feed.getLifecycle().setRetentionStage(globalRetentionStage);
-        feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null);
-        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                feed.getLifecycle().getRetentionStage().getFrequency());
-
-        // lifecycle is defined only at cluster level
-        feed.setLifecycle(null);
-        feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage);
-        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                cluster.getLifecycle().getRetentionStage().getFrequency());
-    }
-
-    @Test
-    public void testGetRetentionFrequency() throws Exception {
-        Feed feed = new Feed();
-        feed.setFrequency(new Frequency("days(10)"));
-
-        // no retention stage frequency defined - test both daily and monthly feeds
-        Lifecycle globalLifecycle = new Lifecycle();
-        RetentionStage globalRetentionStage = new RetentionStage();
-        globalLifecycle.setRetentionStage(globalRetentionStage);
-        feed.setLifecycle(globalLifecycle);
-
-        Clusters clusters = new Clusters();
-        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
-        cluster.setName("cluster1");
-        clusters.getClusters().add(cluster);
-        feed.setClusters(clusters);
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("days(10)"));
-
-        feed.setFrequency(new Frequency("hours(1)"));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("hours(6)"));
-
-        feed.setFrequency(new Frequency("minutes(10)"));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("hours(6)"));
-
-        feed.setFrequency(new Frequency("hours(7)"));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("hours(7)"));
-
-        feed.setFrequency(new Frequency("days(2)"));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("days(2)"));
-
-        // lifecycle at both level - retention only at global level.
-        feed.setFrequency(new Frequency("hours(1)"));
-        globalRetentionStage.setFrequency(new Frequency("hours(2)"));
-        globalLifecycle.setRetentionStage(globalRetentionStage);
-        feed.setLifecycle(globalLifecycle);
-
-        Lifecycle clusterLifecycle = new Lifecycle();
-        RetentionStage clusterRetentionStage = new RetentionStage();
-        clusterLifecycle.setRetentionStage(clusterRetentionStage);
-        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("hours(6)"));
-
-        // lifecycle at both level - retention only at cluster level.
-        feed.getLifecycle().getRetentionStage().setFrequency(null);
-        clusterRetentionStage.setFrequency(new Frequency("hours(4)"));
-        Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
-                            new Frequency("hours(4)"));
-    }
-
-    @Test
-    public void testFeedImportSnapshot() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
-        Assert.assertNotNull(feed.getClusters().getClusters());
-        Assert.assertNotNull(feed.getClusters().getClusters().get(0));
-        Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity());
-        Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity().getStart());
-        Assert.assertNotNull(startInstResult);
-        Assert.assertNotNull(feedCluster.getValidity().getStart());
-        Assert.assertEquals(getDate("2012-02-07 00:00 UTC"), feedCluster.getValidity().getStart());
-        Assert.assertTrue(FeedHelper.isImportEnabled(feedCluster));
-        Assert.assertEquals(MergeType.SNAPSHOT, FeedHelper.getImportMergeType(feedCluster));
-        Assert.assertEquals(startInstResult, feedCluster.getValidity().getStart());
-    }
-
-    @Test
-    public void testFeedImportFields() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
-        List<String> fieldList = FeedHelper.getImportFieldList(feedCluster);
-        Assert.assertEquals(2, fieldList.size());
-        Assert.assertFalse(FeedHelper.isFieldExcludes(feedCluster.getImport().getSource()));
-    }
-
-    @Test
-    public void testFeedImportAppend() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = importFeedAppend(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
-        Assert.assertEquals(startInstResult, feed.getClusters().getClusters().get(0).getValidity().getStart());
-    }
-
-    public void testGetFeedClusterValidity() throws  Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)",  "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
-        Validity validity = FeedHelper.getClusterValidity(feed, cluster.getName());
-        Assert.assertEquals(validity.getStart(), getDate("2012-02-07 00:00 UTC"));
-        Assert.assertEquals(validity.getEnd(), getDate("2020-02-25 00:00 UTC"));
-    }
-
-    @Test(expectedExceptions = FalconException.class)
-    public void testGetClusterValidityInvalidCluster() throws Exception {
-        Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)",  "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
-        FeedHelper.getClusterValidity(feed, "abracadabra");
-    }
-
-    private Validity getFeedValidity(String start, String end) throws ParseException {
-        Validity validity = new Validity();
-        validity.setStart(getDate(start));
-        validity.setEnd(getDate(end));
-        return validity;
-    }
-
-    private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws
-            ParseException {
-
-        org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity();
-        validity.setStart(getDate(start));
-        validity.setEnd(getDate(end));
-        return validity;
-    }
-
-    private Date getDate(String dateString) throws ParseException {
-        DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
-        return format.parse(dateString);
-    }
-
-    private Cluster publishCluster() throws FalconException {
-        Cluster cluster = new Cluster();
-        cluster.setName("feedCluster");
-        cluster.setColo("colo");
-        store.publish(EntityType.CLUSTER, cluster);
-        return cluster;
-
-    }
-
-    private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
-        throws FalconException, ParseException {
-        return publishFeed(cluster, frequency, start, end, null);
-    }
-
-    private Feed publishFeed(Cluster cluster, String frequency, String start, String end, Import imp)
-        throws FalconException, ParseException {
-
-        Feed feed = new Feed();
-        feed.setName("feed");
-        Frequency f = new Frequency(frequency);
-        feed.setFrequency(f);
-        feed.setTimezone(UTC);
-        Clusters fClusters = new Clusters();
-        org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
-        fCluster.setType(ClusterType.SOURCE);
-        fCluster.setImport(imp);
-        fCluster.setName(cluster.getName());
-        fCluster.setValidity(getFeedValidity(start, end));
-        fClusters.getClusters().add(fCluster);
-        feed.setClusters(fClusters);
-        store.publish(EntityType.FEED, feed);
-
-        return feed;
-    }
-
-    private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException {
-        Process process = new Process();
-        process.setName("process");
-        process.setTimezone(UTC);
-        org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters();
-        org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster();
-        pCluster.setName(cluster.getName());
-        org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end);
-        pCluster.setValidity(validity);
-        pClusters.getClusters().add(pCluster);
-        process.setClusters(pClusters);
-        Frequency f = new Frequency(frequency);
-        process.setFrequency(f);
-        return process;
-    }
-
-    private Feed importFeedSnapshot(Cluster cluster, String frequency, String start, String end)
-        throws FalconException, ParseException {
-
-        Import imp = getAnImport(MergeType.SNAPSHOT);
-        Feed feed = publishFeed(cluster, frequency, start, end, imp);
-        return feed;
-    }
-
-    private Feed importFeedAppend(Cluster cluster, String frequency, String start, String end)
-        throws FalconException, ParseException {
-
-        Import imp = getAnImport(MergeType.APPEND);
-        Feed feed = publishFeed(cluster, frequency, start, end);
-        return feed;
-    }
-
-    private Import getAnImport(MergeType mergeType) {
-        Extract extract = new Extract();
-        extract.setType(ExtractMethod.FULL);
-        extract.setMergepolicy(mergeType);
-
-        FieldIncludeExclude fieldInclude = new FieldIncludeExclude();
-        fieldInclude.getFields().add("id");
-        fieldInclude.getFields().add("name");
-        FieldsType fields = new FieldsType();
-        fields.setIncludes(fieldInclude);
-
-        Datasource source = new Datasource();
-        source.setName("test-db");
-        source.setTableName("test-table");
-        source.setExtract(extract);
-        source.setFields(fields);
-
-        Argument a1 = new Argument();
-        a1.setName("--split_by");
-        a1.setValue("id");
-        Argument a2 = new Argument();
-        a2.setName("--num-mappers");
-        a2.setValue("2");
-        Arguments args = new Arguments();
-        List<Argument> argList = args.getArguments();
-        argList.add(a1);
-        argList.add(a2);
-
-        Import imp = new Import();
-        imp.setSource(source);
-        imp.setArguments(args);
-        return imp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
deleted file mode 100644
index 30edd94..0000000
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Clusters;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.feed.Validity;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.FalconTestUtil;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.TimeZone;
-
-/**
- * Test class for File System Storage.
- */
-public class FileSystemStorageTest {
-
-    private static final String USER = FalconTestUtil.TEST_USER_1;
-
-    @BeforeClass
-    public void setUp() {
-        CurrentUser.authenticate(USER);
-    }
-
-    @Test
-    public void testGetType() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
-        Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
-    }
-
-    @Test
-    public void testCreateFromUriTemplate() throws Exception {
-        String feedBasePath = "DATA=hdfs://localhost:8020"
-                + "/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
-                + "#"
-                + "META=hdfs://localhost:8020"
-                + "/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
-                + "#"
-                + "STATS=hdfs://localhost:8020"
-                + "/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}";
-
-        FileSystemStorage storage = new FileSystemStorage(feedBasePath);
-        Assert.assertEquals(storage.getUriTemplate(), feedBasePath);
-
-        Assert.assertEquals("hdfs://localhost:8020", storage.getStorageUrl());
-        Assert.assertEquals("hdfs://localhost:8020/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
-            storage.getUriTemplate(LocationType.DATA));
-        Assert.assertEquals("hdfs://localhost:8020/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
-                storage.getUriTemplate(LocationType.STATS));
-        Assert.assertEquals("hdfs://localhost:8020/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
-                storage.getUriTemplate(LocationType.META));
-    }
-
-    @Test
-    public void testGetUriTemplateForData() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "jail://global:00/foo/bar");
-    }
-
-    @Test
-    public void testFSHomeDir() {
-        final Location location = new Location();
-        location.setPath("foo/bar"); // relative path
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getWorkingDir().toString(), "/user/" + FalconTestUtil.TEST_USER_1);
-    }
-
-    @Test
-    public void testGetUriTemplateForDataWithRelativePath() throws Exception {
-        final Location location = new Location();
-        location.setPath("foo/bar"); // relative path
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA),
-                "hdfs://localhost:41020/user/" + USER + "/foo/bar");
-
-        storage = new FileSystemStorage("hdfs://localhost:41020/", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA),
-                "hdfs://localhost:41020/user/" + USER + "/foo/bar");
-    }
-
-    @Test
-    public void testGetUriTemplateForDataWithAbsolutePath() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar"); // absolute path
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
-
-        storage = new FileSystemStorage("hdfs://localhost:41020/", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
-    }
-
-    @Test
-    public void testGetUriTemplateForDataWithAbsoluteURL() throws Exception {
-        final String absoluteUrl = "s3://host:1000/foo/bar";
-        final Location location = new Location();
-        location.setPath(absoluteUrl); // absolute url
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), absoluteUrl);
-
-        storage = new FileSystemStorage("hdfs://localhost:41020/", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), absoluteUrl);
-    }
-
-    @Test
-    public void testValidateACL() throws Exception {
-        final Location location = new Location();
-        Path path = new Path("/foo/bar");
-        location.setPath(path.toString());
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        String user = System.getProperty("user.name");
-        EmbeddedCluster cluster = EmbeddedCluster.newCluster(user);
-        FileSystem fs = cluster.getFileSystem();
-        fs.mkdirs(path);
-
-        FileSystemStorage storage = new FileSystemStorage(
-                cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY), locations);
-        storage.validateACL(new TestACL(user, user, "0x755"));
-
-        //-ve case
-        try {
-            storage.validateACL(new TestACL("random", user, "0x755"));
-            Assert.fail("Validation should have failed");
-        } catch(FalconException e) {
-            //expected exception
-        }
-
-        //Timed path
-        location.setPath("/foo/bar/${YEAR}/${MONTH}/${DAY}");
-        storage.validateACL(new TestACL(user, user, "rrr"));
-
-        //-ve case
-        try {
-            storage.validateACL(new TestACL("random", user, "0x755"));
-            Assert.fail("Validation should have failed");
-        } catch(FalconException e) {
-            //expected exception
-        }
-    }
-
-    @DataProvider(name = "locationTestWithRelativePathDataProvider")
-    private Object[][] createLocationTestDataWithRelativePath() {
-        return new Object[][] {
-            {"hdfs://h:0", "localDC/rc/billing/ua2", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"hdfs://h:0/", "localDC/rc/billing/ua2", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"hdfs://h:0", "localDC/rc/billing/ua2/", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"hdfs://h:0/", "localDC/rc/billing/ua2/", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"hdfs://h:0", "localDC/rc/billing/ua2//", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"hdfs://h:0/", "localDC/rc/billing/ua2//", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}", "localDC/rc/billing/ua2", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}/", "localDC/rc/billing/ua2", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}", "localDC/rc/billing/ua2/", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}/", "localDC/rc/billing/ua2/", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}", "localDC/rc/billing/ua2//", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}/", "localDC/rc/billing/ua2//", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}/", "localDC/rc/billing/ua2//", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"},
-            {"${nameNode}", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"},
-            {"${nameNode}/", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"},
-            {"hdfs://h:0", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"},
-            {"hdfs://h:0/", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"},
-        };
-    }
-
-    @Test (dataProvider = "locationTestWithRelativePathDataProvider")
-    public void testGetUriTemplateWithRelativePath(String storageUrl, String path,
-                                                   String expected) throws Exception {
-        final Location location = new Location();
-        location.setPath(path);
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage(storageUrl, locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), expected);
-    }
-
-    @Test
-    public void testGetUriTemplate() throws Exception {
-        final Location dataLocation = new Location();
-        dataLocation.setPath("/data/foo/bar");
-        dataLocation.setType(LocationType.DATA);
-
-        final Location metaLocation = new Location();
-        metaLocation.setPath("/meta/foo/bar");
-        metaLocation.setType(LocationType.META);
-
-        final Location statsLocation = new Location();
-        statsLocation.setPath("/stats/foo/bar");
-        statsLocation.setType(LocationType.STATS);
-
-        final Location tmpLocation = new Location();
-        tmpLocation.setPath("/tmp/foo/bar");
-        tmpLocation.setType(LocationType.TMP);
-
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(dataLocation);
-        locations.add(metaLocation);
-        locations.add(statsLocation);
-        locations.add(tmpLocation);
-
-        StringBuilder expected = new StringBuilder();
-        expected.append(LocationType.DATA)
-                .append(FileSystemStorage.LOCATION_TYPE_SEP)
-                .append("jail://global:00/data/foo/bar")
-                .append(FileSystemStorage.FEED_PATH_SEP)
-                .append(LocationType.META)
-                .append(FileSystemStorage.LOCATION_TYPE_SEP)
-                .append("jail://global:00/meta/foo/bar")
-                .append(FileSystemStorage.FEED_PATH_SEP)
-                .append(LocationType.STATS)
-                .append(FileSystemStorage.LOCATION_TYPE_SEP)
-                .append("jail://global:00/stats/foo/bar")
-                .append(FileSystemStorage.FEED_PATH_SEP)
-                .append(LocationType.TMP)
-                .append(FileSystemStorage.LOCATION_TYPE_SEP)
-                .append("jail://global:00/tmp/foo/bar");
-
-        FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
-        Assert.assertEquals(storage.getUriTemplate(), expected.toString());
-    }
-
-    @Test
-    public void testGetUriTemplateWithOutStorageURL() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar");
-    }
-
-    @DataProvider(name = "locationTestDataProvider")
-    private Object[][] createLocationTestData() {
-        return new Object[][] {
-            {"jail://global:00", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
-            {"jail://global:00", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
-            {"jail://global:00", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
-            {"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
-            {"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
-            {"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
-        };
-    }
-
-    @Test (dataProvider = "locationTestDataProvider")
-    public void testGetUriTemplateWithLocationType(String storageUrl, String path,
-                                                   String expected) throws Exception {
-        final Location location = new Location();
-        location.setPath(path);
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage(storageUrl, locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), storageUrl + expected);
-    }
-
-    @Test
-    public void testIsIdentical() throws Exception {
-        final String storageUrl = "jail://global:00";
-        final Location location1 = new Location();
-        location1.setPath("/foo/bar");
-        location1.setType(LocationType.DATA);
-        List<Location> locations1 = new ArrayList<Location>();
-        locations1.add(location1);
-        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
-
-        final Location location2 = new Location();
-        location2.setPath("/foo/bar");
-        location2.setType(LocationType.DATA);
-        List<Location> locations2 = new ArrayList<Location>();
-        locations2.add(location2);
-        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
-
-        Assert.assertTrue(storage1.isIdentical(storage2));
-    }
-
-    @Test
-    public void testIsIdenticalNegative() throws Exception {
-        final String storageUrl = "jail://global:00";
-        final Location location1 = new Location();
-        location1.setPath("/foo/baz");
-        location1.setType(LocationType.DATA);
-        List<Location> locations1 = new ArrayList<Location>();
-        locations1.add(location1);
-        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
-
-        final Location location2 = new Location();
-        location2.setPath("/foo/bar");
-        location2.setType(LocationType.DATA);
-        List<Location> locations2 = new ArrayList<Location>();
-        locations2.add(location2);
-        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
-
-        Assert.assertFalse(storage1.isIdentical(storage2));
-    }
-
-    private class TestACL extends AccessControlList {
-
-        /**
-         * owner is the Owner of this entity.
-         */
-        private String owner;
-
-        /**
-         * group is the one which has access to read - not used at this time.
-         */
-        private String group;
-
-        /**
-         * permission is not enforced at this time.
-         */
-        private String permission;
-
-        TestACL(String owner, String group, String permission) {
-            this.owner = owner;
-            this.group = group;
-            this.permission = permission;
-        }
-
-        @Override
-        public String getOwner() {
-            return owner;
-        }
-
-        @Override
-        public String getGroup() {
-            return group;
-        }
-
-        @Override
-        public String getPermission() {
-            return permission;
-        }
-    }
-
-    @DataProvider(name = "testListingDataProvider")
-    private Object[][] createTestListingData() {
-        final long millis = 24L * 3600 * 1000;
-        final long now = System.currentTimeMillis();
-        TimeZone utc = TimeZone.getTimeZone("UTC");
-        return new Object[][] {
-            {null, Frequency.fromString("hours(2)"), utc, new Date(now - 60 * millis), new Date(now - 56 * millis)},
-            {null, Frequency.fromString("days(1)"), utc, new Date(now - 20 * millis), new Date(now + 6 * millis)},
-            {null, Frequency.fromString("months(1)"), utc, new Date(now - 85 * millis), new Date(now - 10 * millis)},
-        };
-    }
-
-    @Test (dataProvider = "testListingDataProvider")
-    public void testListing(String availabilityFlag, Frequency frequency, TimeZone timeZone,
-                            Date start, Date end) throws Exception {
-        EmbeddedCluster cluster = EmbeddedCluster.newCluster("TestFeedListing", false);
-        FileSystem fs = cluster.getFileSystem();
-        ConfigurationStore.get().publish(EntityType.CLUSTER, cluster.getCluster());
-        try {
-            Feed feed = getFeed(availabilityFlag, frequency, timeZone);
-            List<FeedInstanceStatus> expected = prepareData(fs, feed, start, end);
-            FileSystemStorage fileSystemStorage = new FileSystemStorage(cluster.getFileSystem().
-                    getUri().toString(), feed.getLocations());
-            List<FeedInstanceStatus> actual = fileSystemStorage.
-                    getListing(feed, "TestFeedListing", LocationType.DATA, start, end);
-            Assert.assertEquals(actual, expected, "Feed instance Listings doesn't match");
-        } finally {
-            ConfigurationStore.get().remove(EntityType.CLUSTER, cluster.getCluster().getName());
-        }
-    }
-
-    @SuppressWarnings("MagicConstant")
-    private List<FeedInstanceStatus> prepareData(FileSystem fs, Feed feed,
-                                                 Date start, Date end) throws Exception {
-        fs.delete(new Path("/TestFeedListing"), true);
-        Random random = new Random();
-        List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
-        String basePath = feed.getLocations().getLocations().get(0).getPath();
-        Frequency frequency = feed.getFrequency();
-        TimeZone tz = feed.getTimezone();
-        Date dataStart = EntityUtil.getNextStartTime(feed.getClusters().getClusters().get(0).getValidity().getStart(),
-                feed.getFrequency(), tz, new Date(start.getTime()));
-        Date dataEnd = new Date(end.getTime());
-        while (dataStart.before(dataEnd)) {
-            Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz);
-            String path = ExpressionHelper.substitute(basePath, properties);
-            FeedInstanceStatus instance = new FeedInstanceStatus(path);
-            instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
-            instance.setSize(-1);
-            instance.setCreationTime(0);
-            Date date = FeedHelper.getDate(basePath, new Path(path), tz);
-            instance.setInstance(SchemaHelper.formatDateUTC(date));
-            Calendar cal = Calendar.getInstance();
-            cal.setTime(dataStart);
-            cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
-            dataStart.setTime(cal.getTimeInMillis());
-            if (random.nextBoolean()) {
-                OutputStream out = fs.create(new Path(path, "file"));
-                out.write("Hello World\n".getBytes());
-                out.close();
-                instance.setSize(12);
-                if (feed.getAvailabilityFlag() == null
-                        || (feed.getAvailabilityFlag() != null && random.nextBoolean())) {
-                    //If availability is not present or if ok to create availability file, mark as available
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
-                    if (feed.getAvailabilityFlag() != null) {
-                        fs.create(new Path(path, feed.getAvailabilityFlag())).close();
-                    }
-                } else if (feed.getAvailabilityFlag() != null) {
-                    //If availability is present or not ok to create availability file, mark as partial
-                    fs.mkdirs(new Path(path));
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
-                }
-            } else {
-                if (feed.getAvailabilityFlag() == null && random.nextBoolean()) {
-                    //If availability is not present or ok to create dir, mark as empty
-                    fs.mkdirs(new Path(path));
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
-                    instance.setSize(0);
-                } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) {
-                    //If availability is present and ok to create dir, mark as partial
-                    fs.mkdirs(new Path(path));
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
-                } else if (feed.getAvailabilityFlag() != null)  {
-                    //If availability is present and ok to create empty instance
-                    fs.create(new Path(path, feed.getAvailabilityFlag())).close();
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
-                    instance.setSize(0);
-                }
-            }
-            try {
-                FileStatus fileStatus = fs.getFileStatus(new Path(path));
-                instance.setCreationTime(fileStatus.getModificationTime());
-            } catch (IOException e) {
-                //ignore
-            }
-            instances.add(instance);
-        }
-        return instances;
-    }
-
-    private Feed getFeed(String availabilityFlag, Frequency frequency, TimeZone timeZone) {
-        Feed feed = new Feed();
-        feed.setAvailabilityFlag(availabilityFlag);
-        feed.setFrequency(frequency);
-        feed.setTimezone(timeZone);
-        feed.setLocations(new Locations());
-        Location dataLocation = new Location();
-        feed.getLocations().getLocations().add(dataLocation);
-        dataLocation.setPath("/TestFeedListing/data/${YEAR}/${MONTH}/${DAY}"
-                + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE");
-        dataLocation.setType(LocationType.DATA);
-        feed.setClusters(new Clusters());
-        Cluster cluster = new Cluster();
-        cluster.setName("TestFeedListing");
-        feed.getClusters().getClusters().add(cluster);
-        Validity validity = new Validity();
-        cluster.setValidity(validity);
-        validity.setStart(new Date(System.currentTimeMillis() - (1000L * 24 * 3600000)));
-        validity.setEnd(new Date(System.currentTimeMillis() + (1000L * 24 * 3600000)));
-        return feed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
deleted file mode 100644
index c37cebd..0000000
--- a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import junit.framework.Assert;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfaces;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
-import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
-import org.testng.annotations.Test;
-
-import java.util.Properties;
-
-/**
- * Tests for HiveUtil.
- */
-public class HiveUtilTest {
-
-    @Test
-    public void testGetHiveCredentialsWithoutKerberos() {
-        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
-        Cluster cluster = new Cluster();
-        String metaStoreUrl = "thrift://localhost:19083";
-
-        // set registry interface
-        Interfaces interfaces = new Interfaces();
-        Interface registry = new Interface();
-        registry.setEndpoint(metaStoreUrl);
-        registry.setType(Interfacetype.REGISTRY);
-        registry.setVersion("0.1");
-        interfaces.getInterfaces().add(registry);
-        cluster.setInterfaces(interfaces);
-
-        Properties expected = new Properties();
-        expected.put(HiveUtil.METASTORE_UGI, "true");
-        expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
-        expected.put(HiveUtil.METASTROE_URI, metaStoreUrl);
-        expected.put(HiveUtil.METASTOREURIS, metaStoreUrl);
-
-        Properties actual = HiveUtil.getHiveCredentials(cluster);
-        Assert.assertTrue(actual.equals(expected));
-    }
-
-    @Test
-    public void testGetHiveCredentialsWithKerberos() {
-        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, KerberosAuthenticationHandler.TYPE);
-        Cluster cluster = new Cluster();
-        String metaStoreUrl = "thrift://localhost:19083";
-        String principal = "kerberosPrincipal";
-
-        // set registry interface
-        Interfaces interfaces = new Interfaces();
-        Interface registry = new Interface();
-        registry.setEndpoint(metaStoreUrl);
-        registry.setType(Interfacetype.REGISTRY);
-        registry.setVersion("0.1");
-        interfaces.getInterfaces().add(registry);
-        cluster.setInterfaces(interfaces);
-
-        // set security properties
-        org.apache.falcon.entity.v0.cluster.Properties props = new org.apache.falcon.entity.v0.cluster.Properties();
-        Property principal2 = new Property();
-        principal2.setName(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
-        principal2.setValue(principal);
-        props.getProperties().add(principal2);
-        cluster.setProperties(props);
-        Properties expected = new Properties();
-        expected.put(SecurityUtil.METASTORE_USE_THRIFT_SASL, "true");
-        expected.put(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL, principal);
-        expected.put(SecurityUtil.METASTORE_PRINCIPAL, principal);
-        expected.put(HiveUtil.METASTORE_UGI, "true");
-        expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
-        expected.put(HiveUtil.METASTROE_URI, metaStoreUrl);
-        expected.put(HiveUtil.METASTOREURIS, metaStoreUrl);
-
-        Properties actual = HiveUtil.getHiveCredentials(cluster);
-        Assert.assertTrue(actual.equals(expected));
-    }
-
-
-}
-


Mime
View raw message