falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [41/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom
Date Tue, 01 Mar 2016 08:26:27 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
deleted file mode 100644
index cca2d8b..0000000
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ /dev/null
@@ -1,1292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.LifeCycle;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.datasource.DatasourceType;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.ExtractMethod;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
-import org.apache.falcon.entity.v0.feed.Lifecycle;
-import org.apache.falcon.entity.v0.feed.Load;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.feed.MergeType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.entity.v0.feed.RetentionStage;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.entity.v0.feed.Validity;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.lifecycle.FeedLifecycleStage;
-import org.apache.falcon.resource.APIResult;
-import org.apache.falcon.resource.EntityList;
-import org.apache.falcon.resource.FeedInstanceResult;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.DateUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-
-/**
- * Feed entity helper methods.
- */
-public final class FeedHelper {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FeedHelper.class);
-    private static final int ONE_MS = 1;
-
-    public static final String FORMAT = "yyyyMMddHHmm";
-
-    private FeedHelper() {}
-
-    public static Cluster getCluster(Feed feed, String clusterName) {
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            if (cluster.getName().equals(clusterName)) {
-                return cluster;
-            }
-        }
-        return null;
-    }
-
-    public static Storage createStorage(Feed feed) throws FalconException {
-
-        final Locations feedLocations = feed.getLocations();
-        if (feedLocations != null
-                && feedLocations.getLocations().size() != 0) {
-            return new FileSystemStorage(feed);
-        }
-
-        try {
-            final CatalogTable table = feed.getTable();
-            if (table != null) {
-                return new CatalogStorage(feed);
-            }
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
-        }
-
-        throw new FalconException("Both catalog and locations are not defined.");
-    }
-
-    public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-                                        Feed feed) throws FalconException {
-        return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
-    }
-
-    public static Storage createStorage(String clusterName, Feed feed)
-        throws FalconException {
-
-        return createStorage(getCluster(feed, clusterName), feed);
-    }
-
-    public static Storage createStorage(Cluster cluster, Feed feed)
-        throws FalconException {
-
-        final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
-
-        return createStorage(cluster, feed, clusterEntity);
-    }
-
-    public static Storage createStorage(Cluster cluster, Feed feed,
-                                        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
-        throws FalconException {
-
-        final List<Location> locations = getLocations(cluster, feed);
-        if (locations != null) {
-            return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
-        }
-
-        try {
-            final CatalogTable table = getTable(cluster, feed);
-            if (table != null) {
-                return new CatalogStorage(clusterEntity, table);
-            }
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
-        }
-
-        throw new FalconException("Both catalog and locations are not defined.");
-    }
-
-    /**
-     * Factory method to dole out a storage instance used for replication source.
-     *
-     * @param clusterEntity cluster entity
-     * @param feed feed entity
-     * @return an implementation of Storage
-     * @throws FalconException
-     */
-    public static Storage createReadOnlyStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-                                                Feed feed) throws FalconException {
-        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
-        final List<Location> locations = getLocations(feedCluster, feed);
-        if (locations != null) {
-            return new FileSystemStorage(ClusterHelper.getReadOnlyStorageUrl(clusterEntity), locations);
-        }
-
-        try {
-            final CatalogTable table = getTable(feedCluster, feed);
-            if (table != null) {
-                return new CatalogStorage(clusterEntity, table);
-            }
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
-        }
-
-        throw new FalconException("Both catalog and locations are not defined.");
-    }
-
-    public static Storage createStorage(String type, String storageUriTemplate)
-        throws URISyntaxException {
-
-        Storage.TYPE storageType = Storage.TYPE.valueOf(type);
-        if (storageType == Storage.TYPE.FILESYSTEM) {
-            return new FileSystemStorage(storageUriTemplate);
-        } else if (storageType == Storage.TYPE.TABLE) {
-            return new CatalogStorage(storageUriTemplate);
-        }
-
-        throw new IllegalArgumentException("Bad type: " + type);
-    }
-
-    public static Storage createStorage(String type, String storageUriTemplate,
-                                        Configuration conf) throws URISyntaxException {
-        Storage.TYPE storageType = Storage.TYPE.valueOf(type);
-        if (storageType == Storage.TYPE.FILESYSTEM) {
-            return new FileSystemStorage(storageUriTemplate);
-        } else if (storageType == Storage.TYPE.TABLE) {
-            return new CatalogStorage(storageUriTemplate, conf);
-        }
-
-        throw new IllegalArgumentException("Bad type: " + type);
-    }
-
-    public static Storage.TYPE getStorageType(Feed feed) throws FalconException {
-        final Locations feedLocations = feed.getLocations();
-        if (feedLocations != null
-                && feedLocations.getLocations().size() != 0) {
-            return Storage.TYPE.FILESYSTEM;
-        }
-
-        final CatalogTable table = feed.getTable();
-        if (table != null) {
-            return Storage.TYPE.TABLE;
-        }
-
-        throw new FalconException("Both catalog and locations are not defined.");
-    }
-
-    public static Storage.TYPE getStorageType(Feed feed,
-                                              Cluster cluster) throws FalconException {
-        final List<Location> locations = getLocations(cluster, feed);
-        if (locations != null) {
-            return Storage.TYPE.FILESYSTEM;
-        }
-
-        final CatalogTable table = getTable(cluster, feed);
-        if (table != null) {
-            return Storage.TYPE.TABLE;
-        }
-
-        throw new FalconException("Both catalog and locations are not defined.");
-    }
-
-    public static Storage.TYPE getStorageType(Feed feed,
-                                              org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
-        throws FalconException {
-        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
-        return getStorageType(feed, feedCluster);
-    }
-
-    public static List<Location> getLocations(Cluster cluster, Feed feed) {
-        // check if locations are overridden in cluster
-        final Locations clusterLocations = cluster.getLocations();
-        if (clusterLocations != null
-                && clusterLocations.getLocations().size() != 0) {
-            return clusterLocations.getLocations();
-        }
-
-        Locations feedLocations = feed.getLocations();
-        return feedLocations == null ? null : feedLocations.getLocations();
-    }
-
-    public static Location getLocation(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster cluster,
-                                       LocationType type) {
-        List<Location> locations = getLocations(getCluster(feed, cluster.getName()), feed);
-        if (locations != null) {
-            for (Location location : locations) {
-                if (location.getType() == type) {
-                    return location;
-                }
-            }
-        }
-
-        return null;
-    }
-
-    public static Sla getSLA(Cluster cluster, Feed feed) {
-        final Sla clusterSla = cluster.getSla();
-        if (clusterSla != null) {
-            return clusterSla;
-        }
-        final Sla feedSla = feed.getSla();
-        return feedSla == null ? null : feedSla;
-    }
-
-    public static Sla getSLA(String clusterName, Feed feed) {
-        Cluster cluster = FeedHelper.getCluster(feed, clusterName);
-        return cluster != null ? getSLA(cluster, feed) : null;
-    }
-
-    protected static CatalogTable getTable(Cluster cluster, Feed feed) {
-        // check if table is overridden in cluster
-        if (cluster.getTable() != null) {
-            return cluster.getTable();
-        }
-
-        return feed.getTable();
-    }
-
-    public static String normalizePartitionExpression(String part1, String part2) {
-        String partExp = StringUtils.stripToEmpty(part1) + "/" + StringUtils.stripToEmpty(part2);
-        partExp = partExp.replaceAll("//+", "/");
-        partExp = StringUtils.stripStart(partExp, "/");
-        partExp = StringUtils.stripEnd(partExp, "/");
-        return partExp;
-    }
-
-    public static String normalizePartitionExpression(String partition) {
-        return normalizePartitionExpression(partition, null);
-    }
-
-    public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
-        Properties properties = new Properties();
-        Map<String, String> clusterVars = new HashMap<>();
-        clusterVars.put("colo", cluster.getColo());
-        clusterVars.put("name", cluster.getName());
-        if (cluster.getProperties() != null) {
-            for (org.apache.falcon.entity.v0.cluster.Property property : cluster.getProperties().getProperties()) {
-                clusterVars.put(property.getName(), property.getValue());
-            }
-        }
-        properties.put("cluster", clusterVars);
-        return properties;
-    }
-
-    public static String evaluateClusterExp(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, String exp)
-        throws FalconException {
-
-        Properties properties = getClusterProperties(clusterEntity);
-        ExpressionHelper expHelp = ExpressionHelper.get();
-        expHelp.setPropertiesForVariable(properties);
-        return expHelp.evaluateFullExpression(exp, String.class);
-    }
-
-    public static String getStagingPath(boolean isSource,
-                                        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-                                        Feed feed, CatalogStorage storage, Tag tag, String suffix) {
-        String stagingDirPath = getStagingDir(isSource, clusterEntity, feed, storage, tag);
-
-        String datedPartitionKey = storage.getDatedPartitionKeys().get(0);
-        String datedPartitionKeySuffix = datedPartitionKey + "=${coord:dataOutPartitionValue('output',"
-                + "'" + datedPartitionKey + "')}";
-        return stagingDirPath + "/"
-                + datedPartitionKeySuffix + "/"
-                + suffix + "/"
-                + "data";
-    }
-
-    public static String getStagingDir(boolean isSource,
-                                       org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-                                       Feed feed, CatalogStorage storage, Tag tag) {
-        String workflowName = EntityUtil.getWorkflowName(
-                tag, Arrays.asList(clusterEntity.getName()), feed).toString();
-
-        // log path is created at scheduling wf
-        final String storageUri = isSource
-                ? ClusterHelper.getReadOnlyStorageUrl(clusterEntity) // read interface
-                : ClusterHelper.getStorageUrl(clusterEntity);        // write interface
-        return storageUri
-                + EntityUtil.getLogPath(clusterEntity, feed) + "/"
-                + workflowName + "/"
-                + storage.getDatabase() + "/"
-                + storage.getTable();
-    }
-
-    public static Properties getUserWorkflowProperties(LifeCycle lifeCycle) {
-        Properties props = new Properties();
-        props.put("userWorkflowName", lifeCycle.name().toLowerCase() + "-policy");
-        props.put("userWorkflowEngine", "falcon");
-
-        String version;
-        try {
-            version = BuildProperties.get().getProperty("build.version");
-        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
-            version = "0.6";
-        }
-        props.put("userWorkflowVersion", version);
-        return props;
-    }
-
-    public static Properties getFeedProperties(Feed feed) {
-        Properties feedProperties = new Properties();
-        if (feed.getProperties() != null) {
-            for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) {
-                feedProperties.put(property.getName(), property.getValue());
-            }
-        }
-        return feedProperties;
-    }
-
-    public static Lifecycle getLifecycle(Feed feed, String clusterName) throws FalconException {
-        Cluster cluster = getCluster(feed, clusterName);
-        if (cluster !=null) {
-            return cluster.getLifecycle() != null ? cluster.getLifecycle() : feed.getLifecycle();
-        }
-        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
-    }
-
-    public static RetentionStage getRetentionStage(Feed feed, String clusterName) throws FalconException {
-        if (isLifecycleEnabled(feed, clusterName)) {
-            Lifecycle globalLifecycle = feed.getLifecycle();
-            Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle();
-
-            if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) {
-                return clusterLifecycle.getRetentionStage();
-            } else if (globalLifecycle != null) {
-                return globalLifecycle.getRetentionStage();
-            }
-        }
-        return null;
-    }
-
-    public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
-        if (feedCluster != null) {
-            return feedCluster.getValidity().getStart();
-        } else {
-            throw new FalconException("No matching cluster " + clusterName
-                    + "found for feed " + feed.getName());
-        }
-    }
-
-    public static Date getNextFeedInstanceDate(Date alignedDate, Feed feed) {
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTime(alignedDate);
-        calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(),
-                feed.getFrequency().getFrequencyAsInt());
-        return calendar.getTime();
-    }
-
-    /**
-     * Returns various policies applicable for a feed.
-     *
-     * @param feed
-     * @return list of names of lifecycle policies for the given feed, empty list if there are none.
-     */
-    public static List<String> getPolicies(Feed feed, String clusterName) throws FalconException {
-        List<String> result = new ArrayList<>();
-        Cluster cluster = getCluster(feed, clusterName);
-        if (cluster != null) {
-            if (isLifecycleEnabled(feed, clusterName)) {
-                String policy = getRetentionStage(feed, clusterName).getPolicy();
-                policy = StringUtils.isBlank(policy)
-                        ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
-                result.add(policy);
-            }
-            return result;
-        }
-        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
-    }
-
-    /**
-     *  Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
-     * @param instancePath - actual data path
-     * @param templatePath - template path from feed definition
-     * @param timeZone timeZone
-     * @return date corresponding to the path
-     */
-    //consider just the first occurrence of the pattern
-    public static Date getDate(String templatePath, Path instancePath, TimeZone timeZone) {
-        String path = instancePath.toString();
-        Matcher matcher = FeedDataPath.PATTERN.matcher(templatePath);
-        Calendar cal = Calendar.getInstance(timeZone);
-        int lastEnd = 0;
-
-        Set<FeedDataPath.VARS> matchedVars = new HashSet<>();
-        while (matcher.find()) {
-            FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group());
-            String pad = templatePath.substring(lastEnd, matcher.start());
-            if (!path.startsWith(pad)) {
-                //Template and path do not match
-                return null;
-            }
-
-            int value;
-            try {
-                value = Integer.parseInt(path.substring(pad.length(), pad.length() + pathVar.getValueSize()));
-            } catch (NumberFormatException e) {
-                //Not a valid number for variable
-                return null;
-            }
-
-            pathVar.setCalendar(cal, value);
-            lastEnd = matcher.end();
-            path = path.substring(pad.length() + pathVar.getValueSize());
-            matchedVars.add(pathVar);
-        }
-
-        String remTemplatePath = templatePath.substring(lastEnd);
-        //Match the remaining constant at the end
-        //Handling case where feed instancePath has partitions
-        if (StringUtils.isNotEmpty(path) && StringUtils.isNotEmpty(remTemplatePath)
-                && !path.contains(remTemplatePath)) {
-            return null;
-        }
-
-
-        //Reset other fields
-        for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
-            if (!matchedVars.contains(var)) {
-                switch (var.getCalendarField()) {
-                case Calendar.DAY_OF_MONTH:
-                    cal.set(var.getCalendarField(), 1);
-                    break;
-                default:
-                    cal.set(var.getCalendarField(), 0);
-                }
-            }
-            cal.set(Calendar.SECOND, 0);
-            cal.set(Calendar.MILLISECOND, 0);
-        }
-        return cal.getTime();
-    }
-
-    public static Path getFeedBasePath(String feedPath) throws IOException {
-        Matcher matcher = FeedDataPath.PATTERN.matcher(feedPath);
-        if (matcher.find()) {
-            return new Path(feedPath.substring(0, matcher.start()));
-        } else {
-            throw new IOException("Unable to resolve pattern for feedPath: " + feedPath);
-        }
-
-    }
-
-    private static void validateFeedInstance(Feed feed, Date instanceTime,
-                                             org.apache.falcon.entity.v0.cluster.Cluster cluster) {
-
-        // validate the cluster
-        Cluster feedCluster = getCluster(feed, cluster.getName());
-        if (feedCluster == null) {
-            throw new IllegalArgumentException("Cluster :" + cluster.getName() + " is not a valid cluster for feed:"
-                    + feed.getName());
-        }
-
-        // validate that instanceTime is in validity range
-        if (feedCluster.getValidity().getStart().after(instanceTime)
-                || !feedCluster.getValidity().getEnd().after(instanceTime)) {
-            throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for"
-                    + " Feed: " + feed.getName() + " on cluster:" + cluster.getName());
-        }
-
-        // validate instanceTime on basis of startTime and frequency
-        Date nextInstance = EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(),
-                feed.getTimezone(), instanceTime);
-        if (!nextInstance.equals(instanceTime)) {
-            throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not a valid instance for the "
-                    + " feed: " + feed.getName() + " on cluster: " + cluster.getName()
-                    + " on the basis of startDate and frequency");
-        }
-    }
-
-    /**
-    * Given a feed Instance finds the generating process instance.
-    *
-    * [process, cluster, instanceTime]
-    *
-    * If the feed is replicated, then it returns null.
-    *
-    * @param feed output feed
-    * @param feedInstanceTime instance time of the feed
-    * @return returns the instance of the process which produces the given feed
-            */
-    public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime,
-        org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
-
-        //validate the inputs
-        validateFeedInstance(feed, feedInstanceTime, cluster);
-        Process process = getProducerProcess(feed);
-        if (process != null) {
-            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process,
-                    cluster.getName());
-            Date pStart = processCluster.getValidity().getStart();
-            Date pEnd = processCluster.getValidity().getEnd();
-            Frequency pFrequency = process.getFrequency();
-            TimeZone pTz = process.getTimezone();
-
-            try {
-                Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster);
-                boolean isValid = EntityUtil.isValidInstanceTime(pStart, pFrequency, pTz, processInstanceTime);
-                if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd) || !isValid){
-                    return null;
-                }
-
-                SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                        processInstanceTime, EntityType.PROCESS);
-                producer.setTags(SchedulableEntityInstance.OUTPUT);
-                return producer;
-            } catch (FalconException | IllegalArgumentException e) {
-                LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } "
-                        + " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName());
-            }
-        }
-        return null;
-    }
-
-    /**
-     * Given a feed find it's generating process.
-     *
-     * If no generating process is found it returns null.
-     * @param feed output feed
-     * @return Process which produces the given feed.
-     */
-    public static Process getProducerProcess(Feed feed) throws FalconException {
-
-        EntityList dependencies = EntityUtil.getEntityDependencies(feed);
-
-        for (EntityList.EntityElement e : dependencies.getElements()) {
-            if (e.tag.contains(EntityList.OUTPUT_TAG)) {
-                return EntityUtil.getEntity(EntityType.PROCESS, e.name);
-            }
-        }
-        return null;
-    }
-
-    /**
-     * Find the producerInstance which will generate the given feedInstance.
-     *
-     * @param feed output feed
-     * @param feedInstanceTime instance time of the output feed
-     * @param producer producer process
-     * @return time of the producer instance which will produce the given feed instance.
-     */
-    private static Date getProducerInstanceTime(Feed feed, Date feedInstanceTime, Process producer,
-                                       org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
-
-        String clusterName = cluster.getName();
-        Cluster feedCluster = getCluster(feed, clusterName);
-        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(producer, clusterName);
-        Date producerStartDate = processCluster.getValidity().getStart();
-
-        // read the process definition and find the relative time difference between process and output feed
-        // if output process instance time is now then output FeedInstance time is x
-        String outputInstance = null;
-        for (Output op : producer.getOutputs().getOutputs()) {
-            if (StringUtils.equals(feed.getName(), op.getFeed())) {
-                outputInstance = op.getInstance();
-            }
-        }
-
-        ExpressionHelper.setReferenceDate(producerStartDate);
-        ExpressionHelper evaluator = ExpressionHelper.get();
-        // producerInstance = feedInstanceTime + (difference between producer process and feed)
-        // the feedInstance before or equal to this time is the required one
-        Date relativeFeedInstance = evaluator.evaluate(outputInstance, Date.class);
-        Date feedInstanceActual = EntityUtil.getPreviousInstanceTime(feedCluster.getValidity().getStart(),
-                feed.getFrequency(), feed.getTimezone(), relativeFeedInstance);
-        Long producerInstanceTime = feedInstanceTime.getTime() + (producerStartDate.getTime()
-                - feedInstanceActual.getTime());
-        Date producerInstance = new Date(producerInstanceTime);
-
-        //validate that the producerInstance is in the validity range on the provided cluster
-        if (producerInstance.before(processCluster.getValidity().getStart())
-                || producerInstance.after(processCluster.getValidity().getEnd())) {
-            throw new IllegalArgumentException("Instance time provided: " + feedInstanceTime
-                    + " for feed " + feed.getName()
-                    + " is outside the range of instances produced by the producer process: " + producer.getName()
-                    + " in it's validity range on provided cluster: " + cluster.getName());
-        }
-        return producerInstance;
-    }
-
-
-    public static Set<SchedulableEntityInstance> getConsumerInstances(Feed feed, Date feedInstanceTime,
-                  org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
-
-        Set<SchedulableEntityInstance> result = new HashSet<>();
-        // validate that the feed has this cluster & validate that the instanceTime is a valid instanceTime
-        validateFeedInstance(feed, feedInstanceTime, cluster);
-
-        Set<Process> consumers = getConsumerProcesses(feed);
-        for (Process p : consumers) {
-            Set<Date> consumerInstanceTimes = getConsumerProcessInstanceTimes(feed, feedInstanceTime, p, cluster);
-            for (Date date : consumerInstanceTimes) {
-                SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date,
-                        EntityType.PROCESS);
-                in.setTags(SchedulableEntityInstance.INPUT);
-                result.add(in);
-            }
-        }
-        return result;
-    }
-
-
-    /**
-     * Returns the consumer processes for a given feed if any, null otherwise.
-     *
-     * @param feed input feed
-     * @return the set of processes which use the given feed as input, empty set if no consumers.
-     */
-    public static Set<Process> getConsumerProcesses(Feed feed) throws FalconException {
-        Set<Process> result = new HashSet<>();
-        EntityList dependencies = EntityUtil.getEntityDependencies(feed);
-
-        for (EntityList.EntityElement e : dependencies.getElements()) {
-            if (e.tag.contains(EntityList.INPUT_TAG)) {
-                Process consumer = EntityUtil.getEntity(EntityType.PROCESS, e.name);
-                result.add(consumer);
-            }
-        }
-        return result;
-    }
-
-    // return all instances of a process which will consume the given feed instance
-    private static Set<Date> getConsumerProcessInstanceTimes(Feed feed, Date feedInstancetime, Process consumer,
-              org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
-
-        Set<Date> result = new HashSet<>();
-        // find relevant cluster for the process
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-                ProcessHelper.getCluster(consumer, cluster.getName());
-        if (processCluster == null) {
-            throw new IllegalArgumentException("Cluster is not valid for process");
-        }
-        Date processStartDate = processCluster.getValidity().getStart();
-        Cluster feedCluster = getCluster(feed, cluster.getName());
-        Date feedStartDate = feedCluster.getValidity().getStart();
-
-        // find all corresponding Inputs as a process may refer same feed multiple times
-        List<Input> inputFeeds = new ArrayList<>();
-        if (consumer.getInputs() != null && consumer.getInputs().getInputs() != null) {
-            for (Input input : consumer.getInputs().getInputs()) {
-                if (StringUtils.equals(input.getFeed(), feed.getName())) {
-                    inputFeeds.add(input);
-                }
-            }
-        }
-
-        // for each input corresponding to given feed, find corresponding consumer instances
-        for (Input in : inputFeeds) {
-            /* Algorithm for finding a consumer instance for an input feed instance
-            Step 1. Find one instance which will consume the given feed instance.
-                    a. take process start date and find last input feed instance time. In this step take care of
-                        frequencies being out of sync.
-                    b. using the above find the time difference between the process instance and feed instance.
-                    c. Adding the above time difference to given feed instance for which we want to find the consumer
-                        instances we will get one consumer process instance.
-            Step 2. Keep checking for next instances of process till they consume the given feed Instance.
-            Step 3. Similarly check for all previous instances of process till they consume the given feed instance.
-            */
-
-            // Step 1.a & 1.b
-            ExpressionHelper.setReferenceDate(processStartDate);
-            ExpressionHelper evaluator = ExpressionHelper.get();
-            Date startRelative = evaluator.evaluate(in.getStart(), Date.class);
-            Date startTimeActual = EntityUtil.getPreviousInstanceTime(feedStartDate,
-                    feed.getFrequency(), feed.getTimezone(), startRelative);
-            Long offset = processStartDate.getTime() - startTimeActual.getTime();
-
-            // Step 1.c
-            Date processInstanceStartRelative = new Date(feedInstancetime.getTime() + offset);
-            Date processInstanceStartActual = EntityUtil.getPreviousInstanceTime(processStartDate,
-                    consumer.getFrequency(), consumer.getTimezone(), processInstanceStartRelative);
-
-
-            // Step 2.
-            Date currentInstance = processInstanceStartActual;
-            while (true) {
-                Date nextConsumerInstance = EntityUtil.getNextStartTime(processStartDate,
-                        consumer.getFrequency(), consumer.getTimezone(), currentInstance);
-
-                ExpressionHelper.setReferenceDate(nextConsumerInstance);
-                evaluator = ExpressionHelper.get();
-                Date inputStart = evaluator.evaluate(in.getStart(), Date.class);
-                Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(),
-                        feed.getTimezone(), inputStart).getTime();
-                Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
-                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) {
-                    if (!nextConsumerInstance.before(processCluster.getValidity().getStart())
-                            && nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
-                        result.add(nextConsumerInstance);
-                    }
-                } else {
-                    break;
-                }
-                currentInstance = new Date(nextConsumerInstance.getTime() + ONE_MS);
-            }
-
-            // Step 3.
-            currentInstance = processInstanceStartActual;
-            while (true) {
-                Date nextConsumerInstance = EntityUtil.getPreviousInstanceTime(processStartDate,
-                        consumer.getFrequency(), consumer.getTimezone(), currentInstance);
-
-                ExpressionHelper.setReferenceDate(nextConsumerInstance);
-                evaluator = ExpressionHelper.get();
-                Date inputStart = evaluator.evaluate(in.getStart(), Date.class);
-                Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(),
-                        feed.getTimezone(), inputStart).getTime();
-                Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
-                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) {
-                    if (!nextConsumerInstance.before(processCluster.getValidity().getStart())
-                            && nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
-                        result.add(nextConsumerInstance);
-                    }
-                } else {
-                    break;
-                }
-                currentInstance = new Date(nextConsumerInstance.getTime() - ONE_MS);
-            }
-        }
-        return result;
-    }
-
-    public static FeedInstanceResult getFeedInstanceListing(Entity entityObject,
-                                                            Date start, Date end) throws FalconException {
-        Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);
-        FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success");
-        for (String cluster : clusters) {
-            Feed feed = (Feed) entityObject;
-            Storage storage = createStorage(cluster, feed);
-            List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end);
-            FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()];
-            int index = 0;
-            for (FeedInstanceStatus feedStatus : feedListing) {
-                FeedInstanceResult.Instance instance = new
-                        FeedInstanceResult.Instance(cluster, feedStatus.getInstance(),
-                        feedStatus.getStatus().name());
-                instance.creationTime = feedStatus.getCreationTime();
-                instance.uri = feedStatus.getUri();
-                instance.size = feedStatus.getSize();
-                instance.sizeH = feedStatus.getSizeH();
-                instances[index++] = instance;
-            }
-            result.setInstances(instances);
-        }
-        return result;
-    }
-
-
-    /**
-     * Returns the data source type associated with the Feed's import policy.
-     *
-     * @param clusterEntity
-     * @param feed
-     * @return {@link org.apache.falcon.entity.v0.datasource.DatasourceType}
-     * @throws FalconException
-     */
-    public static DatasourceType getImportDatasourceType(
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-            Feed feed) throws FalconException {
-        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
-        if (isImportEnabled(feedCluster)) {
-            return DatasourceHelper.getDatasourceType(getImportDatasourceName(feedCluster));
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Return if Import policy is enabled in the Feed definition.
-     *
-     * @param feedCluster
-     * @return true if import policy is enabled else false
-     */
-
-    public static boolean isImportEnabled(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (feedCluster.getType() == ClusterType.SOURCE) {
-            return (feedCluster.getImport() != null);
-        }
-        return false;
-    }
-
-
-
-    /**
-     * Returns the data source name associated with the Feed's import policy.
-     *
-     * @param feedCluster
-     * @return DataSource name defined in the Datasource Entity
-     */
-    public static String getImportDatasourceName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (isImportEnabled(feedCluster)) {
-            return feedCluster.getImport().getSource().getName();
-        } else {
-            return null;
-        }
-    }
-
-
-
-    /**
-     * Returns Datasource table name.
-     *
-     * @param feedCluster
-     * @return Table or Topic name of the Datasource
-     */
-
-    public static String getImportDataSourceTableName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (isImportEnabled(feedCluster)) {
-            return feedCluster.getImport().getSource().getTableName();
-        } else {
-            return null;
-        }
-    }
-
-
-
-    /**
-     * Returns the extract method type.
-     *
-     * @param feedCluster
-     * @return {@link org.apache.falcon.entity.v0.feed.ExtractMethod}
-     */
-
-    public static ExtractMethod getImportExtractMethod(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (isImportEnabled(feedCluster)) {
-            return feedCluster.getImport().getSource().getExtract().getType();
-        } else {
-            return null;
-        }
-    }
-
-
-
-    /**
-     * Returns the merge type of the Feed import policy.
-     *
-     * @param feedCluster
-     * @return {@link org.apache.falcon.entity.v0.feed.MergeType}
-     */
-    public static MergeType getImportMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (isImportEnabled(feedCluster)) {
-            return feedCluster.getImport().getSource().getExtract().getMergepolicy();
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Returns the initial instance date for the import data set for coorinator.
-     *
-     * @param feedCluster
-     * @return Feed cluster validity start date or recent time
-     */
-    public static Date getImportInitalInstance(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        return feedCluster.getValidity().getStart();
-    }
-
-
-    /**
-     * Helper method to check if the merge type is snapshot.
-     *
-     * @param feedCluster
-     * @return true if the feed import policy merge type is snapshot
-     *
-     */
-    public static boolean isSnapshotMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        return MergeType.SNAPSHOT == getImportMergeType(feedCluster);
-    }
-
-    /**
-     * Returns extra arguments specified in the Feed import policy.
-     *
-     * @param feedCluster
-     * @return
-     * @throws FalconException
-     */
-    public static Map<String, String> getImportArguments(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
-        throws FalconException {
-
-        Map<String, String> argsMap = new HashMap<String, String>();
-        if (feedCluster.getImport().getArguments() == null) {
-            return argsMap;
-        }
-
-        for(org.apache.falcon.entity.v0.feed.Argument p : feedCluster.getImport().getArguments().getArguments()) {
-            argsMap.put(p.getName().toLowerCase(), p.getValue());
-        }
-        return argsMap;
-    }
-
-
-
-
-    /**
-     * Returns Fields list specified in the Import Policy.
-     *
-     * @param feedCluster
-     * @return List of String
-     * @throws FalconException
-     */
-    public static List<String> getImportFieldList(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
-        throws FalconException {
-        if (feedCluster.getImport().getSource().getFields() == null) {
-            return null;
-        }
-        org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields();
-        FieldIncludeExclude includeFileds = fieldType.getIncludes();
-        if (includeFileds == null) {
-            return null;
-        }
-        return includeFileds.getFields();
-    }
-
-
-    /**
-     * Returns true if exclude field lists are used. This is a TBD feature.
-     *
-     * @param ds Feed Datasource
-     * @return true of exclude field list is used or false.
-     * @throws FalconException
-     */
-
-    public static boolean isFieldExcludes(org.apache.falcon.entity.v0.feed.Datasource ds)
-        throws FalconException {
-        if (ds.getFields() != null) {
-            org.apache.falcon.entity.v0.feed.FieldsType fieldType = ds.getFields();
-            FieldIncludeExclude excludeFileds = fieldType.getExcludes();
-            if ((excludeFileds != null) && (excludeFileds.getFields().size() > 0)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName,
-                                                                              Date instanceTime)
-        throws FalconException {
-        Storage storage = createStorage(clusterName, feed);
-        return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA, instanceTime);
-    }
-
-    public static boolean isLifecycleEnabled(Feed feed, String clusterName) {
-        Cluster cluster = getCluster(feed, clusterName);
-        return cluster != null && (feed.getLifecycle() != null || cluster.getLifecycle() != null);
-    }
-
-    public static Frequency getLifecycleRetentionFrequency(Feed feed, String clusterName) throws FalconException {
-        Frequency retentionFrequency = null;
-        RetentionStage retentionStage = getRetentionStage(feed, clusterName);
-        if (retentionStage != null) {
-            if (retentionStage.getFrequency() != null) {
-                retentionFrequency = retentionStage.getFrequency();
-            } else {
-                Frequency feedFrequency = feed.getFrequency();
-                Frequency defaultFrequency = new Frequency("hours(6)");
-                if (DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency)) {
-                    retentionFrequency = defaultFrequency;
-                } else {
-                    retentionFrequency = new Frequency(feedFrequency.toString());
-                }
-            }
-        }
-        return  retentionFrequency;
-    }
-
-    /**
-     * Returns the hadoop cluster queue name specified for the replication jobs to run in the Lifecycle
-     * section of the target cluster section of the feed entity.
-     *
-     * NOTE: Lifecycle for replication is not implemented. This will return the queueName property value.
-     *
-     * @param feed
-     * @param clusterName
-     * @return hadoop cluster queue name specified in the feed entity
-     * @throws FalconException
-     */
-
-    public static String getLifecycleReplicationQueue(Feed feed, String clusterName) throws FalconException {
-        return null;
-    }
-
-    /**
-     * Returns the hadoop cluster queue name specified for the retention jobs to run in the Lifecycle
-     * section of feed entity.
-     *
-     * @param feed
-     * @param clusterName
-     * @return hadoop cluster queue name specified in the feed entity
-     * @throws FalconException
-     */
-    public static String getLifecycleRetentionQueue(Feed feed, String clusterName) throws FalconException {
-        RetentionStage retentionStage = getRetentionStage(feed, clusterName);
-        if (retentionStage != null) {
-            return retentionStage.getQueue();
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Returns the data source type associated with the Feed's export policy.
-     *
-     * @param clusterEntity
-     * @param feed
-     * @return {@link org.apache.falcon.entity.v0.datasource.DatasourceType}
-     * @throws FalconException
-     */
-    public static DatasourceType getExportDatasourceType(
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-            Feed feed) throws FalconException {
-        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
-        if (isExportEnabled(feedCluster)) {
-            return DatasourceHelper.getDatasourceType(getExportDatasourceName(feedCluster));
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Return if Export policy is enabled in the Feed definition.
-     *
-     * @param feedCluster
-     * @return true if export policy is enabled else false
-     */
-
-    public static boolean isExportEnabled(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        return (feedCluster.getExport() != null);
-    }
-
-    /**
-     * Returns the data source name associated with the Feed's export policy.
-     *
-     * @param feedCluster
-     * @return DataSource name defined in the Datasource Entity
-     */
-    public static String getExportDatasourceName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (isExportEnabled(feedCluster)) {
-            return feedCluster.getExport().getTarget().getName();
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Returns Datasource table name.
-     *
-     * @param feedCluster
-     * @return Table or Topic name of the Datasource
-     */
-
-    public static String getExportDataSourceTableName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (isExportEnabled(feedCluster)) {
-            return feedCluster.getExport().getTarget().getTableName();
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Returns the export load type.
-     *
-     * @param feedCluster
-     * @return {@link org.apache.falcon.entity.v0.feed.Load}
-     */
-
-    public static Load getExportLoadMethod(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        if (isExportEnabled(feedCluster)) {
-            return feedCluster.getExport().getTarget().getLoad();
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Returns the initial instance date for the export data set for coorinator.
-     *
-     * @param feedCluster
-     * @return Feed cluster validity start date or recent time
-     */
-    public static Date getExportInitalInstance(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
-        return feedCluster.getValidity().getStart();
-    }
-
-    /**
-     * Returns extra arguments specified in the Feed export policy.
-     *
-     * @param feedCluster
-     * @return
-     * @throws FalconException
-     */
-    public static Map<String, String> getExportArguments(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
-        throws FalconException {
-
-        Map<String, String> argsMap = new HashMap<String, String>();
-        if (feedCluster.getExport().getArguments() == null) {
-            return argsMap;
-        }
-
-        for(org.apache.falcon.entity.v0.feed.Argument p : feedCluster.getExport().getArguments().getArguments()) {
-            argsMap.put(p.getName().toLowerCase(), p.getValue());
-        }
-        return argsMap;
-    }
-
-    public static Validity getClusterValidity(Feed feed, String clusterName) throws FalconException {
-        Cluster cluster = getCluster(feed, clusterName);
-        if (cluster == null) {
-            throw new FalconException("Invalid cluster: " + clusterName + " for feed: " + feed.getName());
-        }
-        return cluster.getValidity();
-    }
-
-    public static Frequency getOldRetentionFrequency(Feed feed) {
-        Frequency feedFrequency = feed.getFrequency();
-        Frequency defaultFrequency = new Frequency("hours(24)");
-        if (DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency)) {
-            return new Frequency("hours(6)");
-        } else {
-            return defaultFrequency;
-        }
-    }
-
-    public static Frequency getRetentionFrequency(Feed feed, Cluster feedCluster) throws FalconException {
-        Frequency retentionFrequency;
-        retentionFrequency = getLifecycleRetentionFrequency(feed, feedCluster.getName());
-        if (retentionFrequency == null) {
-            retentionFrequency = getOldRetentionFrequency(feed);
-        }
-        return retentionFrequency;
-    }
-
-    public static int getRetentionLimitInSeconds(Feed feed, String clusterName) throws FalconException {
-        Frequency retentionLimit = new Frequency("minutes(0)");
-        RetentionStage retentionStage = getRetentionStage(feed, clusterName);
-        if (retentionStage != null) {
-            for (Property property : retentionStage.getProperties().getProperties()) {
-                if (property.getName().equalsIgnoreCase("retention.policy.agebaseddelete.limit")) {
-                    retentionLimit = new Frequency(property.getValue());
-                    break;
-                }
-            }
-        } else {
-            retentionLimit = getCluster(feed, clusterName).getRetention().getLimit();
-        }
-        Long freqInMillis = DateUtil.getFrequencyInMillis(retentionLimit);
-        return (int) (freqInMillis/1000);
-    }
-
-    /**
-     * Returns the replication job's queue name specified in the feed entity definition.
-     * First looks into the Lifecycle stage if exists. If null, looks into the queueName property specified
-     * in the Feed definition.
-     *
-     * @param feed
-     * @param feedCluster
-     * @return
-     * @throws FalconException
-     */
-    public static String getReplicationQueue(Feed feed, Cluster feedCluster) throws FalconException {
-        String queueName;
-        queueName = getLifecycleReplicationQueue(feed, feedCluster.getName());
-        if (StringUtils.isBlank(queueName)) {
-            queueName = getQueueFromProperties(feed);
-        }
-        return queueName;
-    }
-
-    /**
-     * Returns the retention job's queue name specified in the feed entity definition.
-     * First looks into the Lifecycle stage. If null, looks into the queueName property specified
-     * in the Feed definition.
-     *
-     * @param feed
-     * @param feedCluster
-     * @return
-     * @throws FalconException
-     */
-    public static String getRetentionQueue(Feed feed, Cluster feedCluster) throws FalconException {
-        String queueName = getLifecycleRetentionQueue(feed, feedCluster.getName());
-        if (StringUtils.isBlank(queueName)) {
-            queueName = getQueueFromProperties(feed);
-        }
-        return queueName;
-    }
-
-    /**
-     * Returns the queue name specified in the Feed entity definition from queueName property.
-     *
-     * @param feed
-     * @return queueName property value
-     */
-    public static String getQueueFromProperties(Feed feed) {
-        return getPropertyValue(feed, EntityUtil.MR_QUEUE_NAME);
-    }
-
-    /**
-     * Returns value of a feed property given property name.
-     * @param feed
-     * @param propName
-     * @return property value
-     */
-
-    public static String getPropertyValue(Feed feed, String propName) {
-        if (feed.getProperties() != null) {
-            for (Property prop : feed.getProperties().getProperties()) {
-                if ((prop != null) && (prop.getName().equals(propName))) {
-                    return prop.getValue();
-                }
-            }
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java b/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java
deleted file mode 100644
index 8b43671..0000000
--- a/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.entity;
-
-/**
- * Feed Instance Status is used to provide feed instance listing and corresponding status.
- *
- * This is used for exchanging information for getListing api
- */
-public class FeedInstanceStatus {
-
-    private String instance;
-
-    private final String uri;
-
-    private long creationTime;
-
-    private long size = -1;
-
-    private String sizeH;
-
-    private AvailabilityStatus status = AvailabilityStatus.MISSING;
-
-    /**
-     * Availability status of a feed instance.
-     *
-     * Missing if the feed partition is entirely missing,
-     * Available if present and the availability flag is also present
-     * Availability flag is configured in feed definition, but availability flag is missing in data path
-     * Empty if the empty
-     */
-    public enum AvailabilityStatus {MISSING, AVAILABLE, PARTIAL, EMPTY}
-
-    public FeedInstanceStatus(String uri) {
-        this.uri = uri;
-    }
-
-    public String getInstance() {
-        return instance;
-    }
-
-    public void setInstance(String instance) {
-        this.instance = instance;
-    }
-
-    public String getUri() {
-        return uri;
-    }
-
-    public long getCreationTime() {
-        return creationTime;
-    }
-
-    public void setCreationTime(long creationTime) {
-        this.creationTime = creationTime;
-    }
-
-    public long getSize() {
-        return size;
-    }
-
-    public String getSizeH(){
-        return sizeH;
-    }
-
-    public void setSize(long size) {
-        this.size = size;
-    }
-
-    public void setSizeH(String sizeH) {
-        this.sizeH = sizeH;
-    }
-
-
-    public AvailabilityStatus getStatus() {
-        return status;
-    }
-
-    public void setStatus(AvailabilityStatus status) {
-        this.status = status;
-    }
-
-    @Override
-    public String toString() {
-        return "FeedInstanceStatus{"
-                + "instance='" + instance + '\''
-                + ", uri='" + uri + '\''
-                + ", creationTime=" + creationTime
-                + ", size=" + size
-                + ", status='" + status + '\''
-                + '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        FeedInstanceStatus that = (FeedInstanceStatus) o;
-
-        if (creationTime != that.creationTime) {
-            return false;
-        }
-        if (size != that.size) {
-            return false;
-        }
-        if (!instance.equals(that.instance)) {
-            return false;
-        }
-        if (status != that.status) {
-            return false;
-        }
-        if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = instance.hashCode();
-        result = 31 * result + (uri != null ? uri.hashCode() : 0);
-        result = 31 * result + (int) (creationTime ^ (creationTime >>> 32));
-        result = 31 * result + (int) (size ^ (size >>> 32));
-        result = 31 * result + (status != null ? status.hashCode() : 0);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
deleted file mode 100644
index ece8b5d..0000000
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ /dev/null
@@ -1,509 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.retention.EvictedInstanceSerDe;
-import org.apache.falcon.retention.EvictionHelper;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.jsp.el.ELException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * A file system implementation of a feed storage.
- */
-public class FileSystemStorage extends Configured implements Storage {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FileSystemStorage.class);
-    private final StringBuffer instancePaths = new StringBuffer();
-    private final StringBuilder instanceDates = new StringBuilder();
-
-    public static final String FEED_PATH_SEP = "#";
-    public static final String LOCATION_TYPE_SEP = "=";
-
-    public static final String FILE_SYSTEM_URL = "${nameNode}";
-
-    private final String storageUrl;
-    private final List<Location> locations;
-
-    public FileSystemStorage(Feed feed) {
-        this(FILE_SYSTEM_URL, feed.getLocations());
-    }
-
-    protected FileSystemStorage(String storageUrl, Locations locations) {
-        this(storageUrl, locations.getLocations());
-    }
-
-    protected FileSystemStorage(String storageUrl, List<Location> locations) {
-        if (storageUrl == null || storageUrl.length() == 0) {
-            throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
-        }
-
-        if (locations == null || locations.size() == 0) {
-            throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
-        }
-
-        this.storageUrl = storageUrl;
-        this.locations = locations;
-    }
-
-    /**
-     * Create an instance from the URI Template that was generated using
-     * the getUriTemplate() method.
-     *
-     * @param uriTemplate the uri template from org.apache.falcon.entity.FileSystemStorage#getUriTemplate
-     * @throws URISyntaxException
-     */
-    protected FileSystemStorage(String uriTemplate) throws URISyntaxException {
-        if (uriTemplate == null || uriTemplate.length() == 0) {
-            throw new IllegalArgumentException("URI template cannot be null or empty");
-        }
-
-        String rawStorageUrl = null;
-        List<Location> rawLocations = new ArrayList<Location>();
-        String[] feedLocs = uriTemplate.split(FEED_PATH_SEP);
-        for (String rawPath : feedLocs) {
-            String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP);
-            final String processed = typeAndPath[1].replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
-                                                   .replaceAll("}", EXPR_CLOSE_NORMALIZED);
-            URI uri = new URI(processed);
-            if (rawStorageUrl == null) {
-                rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority();
-            }
-
-            String path = uri.getPath();
-            final String finalPath = path.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
-                                         .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
-
-            Location location = new Location();
-            location.setPath(finalPath);
-            location.setType(LocationType.valueOf(typeAndPath[0]));
-            rawLocations.add(location);
-        }
-
-        this.storageUrl = rawStorageUrl;
-        this.locations = rawLocations;
-    }
-
-    @Override
-    public TYPE getType() {
-        return TYPE.FILESYSTEM;
-    }
-
-    public String getStorageUrl() {
-        return storageUrl;
-    }
-
-    public List<Location> getLocations() {
-        return locations;
-    }
-
-    @Override
-    public String getUriTemplate() {
-        String feedPathMask = getUriTemplate(LocationType.DATA);
-        String metaPathMask = getUriTemplate(LocationType.META);
-        String statsPathMask = getUriTemplate(LocationType.STATS);
-        String tmpPathMask = getUriTemplate(LocationType.TMP);
-
-        StringBuilder feedBasePaths = new StringBuilder();
-        feedBasePaths.append(LocationType.DATA.name())
-                     .append(LOCATION_TYPE_SEP)
-                     .append(feedPathMask);
-
-        if (metaPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP)
-                         .append(LocationType.META.name())
-                         .append(LOCATION_TYPE_SEP)
-                         .append(metaPathMask);
-        }
-
-        if (statsPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP)
-                         .append(LocationType.STATS.name())
-                         .append(LOCATION_TYPE_SEP)
-                         .append(statsPathMask);
-        }
-
-        if (tmpPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP)
-                         .append(LocationType.TMP.name())
-                         .append(LOCATION_TYPE_SEP)
-                         .append(tmpPathMask);
-        }
-
-        return feedBasePaths.toString();
-    }
-
-    @Override
-    public String getUriTemplate(LocationType locationType) {
-        return getUriTemplate(locationType, locations);
-    }
-
-    public String getUriTemplate(LocationType locationType, List<Location> locationList) {
-        Location locationForType = null;
-        for (Location location : locationList) {
-            if (location.getType() == locationType) {
-                locationForType = location;
-                break;
-            }
-        }
-
-        if (locationForType == null || StringUtils.isEmpty(locationForType.getPath())) {
-            return null;
-        }
-
-        // normalize the path so trailing and double '/' are removed
-        Path locationPath = new Path(locationForType.getPath());
-        locationPath = locationPath.makeQualified(getDefaultUri(), getWorkingDir());
-
-        if (isRelativePath(locationPath)) {
-            locationPath = new Path(storageUrl + locationPath);
-        }
-
-        return locationPath.toString();
-    }
-
-    private boolean isRelativePath(Path locationPath) {
-        return locationPath.toUri().getAuthority() == null && isStorageUrlATemplate();
-    }
-
-    private boolean isStorageUrlATemplate() {
-        return storageUrl.startsWith(FILE_SYSTEM_URL);
-    }
-
-    private URI getDefaultUri() {
-        return new Path(isStorageUrlATemplate() ? "/" : storageUrl).toUri();
-    }
-
-    public Path getWorkingDir() {
-        return new Path(CurrentUser.isAuthenticated() ? "/user/" + CurrentUser.getUser() : "/");
-    }
-
-    @Override
-    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
-        if (!(toCompareAgainst instanceof FileSystemStorage)) {
-            return false;
-        }
-
-        FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
-        final List<Location> fsStorageLocations = fsStorage.getLocations();
-
-        return getLocations().size() == fsStorageLocations.size()
-                && StringUtils.equals(getUriTemplate(LocationType.DATA, getLocations()),
-                    getUriTemplate(LocationType.DATA, fsStorageLocations))
-                && StringUtils.equals(getUriTemplate(LocationType.STATS, getLocations()),
-                    getUriTemplate(LocationType.STATS, fsStorageLocations))
-                && StringUtils.equals(getUriTemplate(LocationType.META, getLocations()),
-                    getUriTemplate(LocationType.META, fsStorageLocations))
-                && StringUtils.equals(getUriTemplate(LocationType.TMP, getLocations()),
-                    getUriTemplate(LocationType.TMP, fsStorageLocations));
-    }
-
-    public static Location getLocation(List<Location> locations, LocationType type) {
-        for (Location loc : locations) {
-            if (loc.getType() == type) {
-                return loc;
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public void validateACL(AccessControlList acl) throws FalconException {
-        try {
-            for (Location location : getLocations()) {
-                String pathString = getRelativePath(location);
-                Path path = new Path(pathString);
-                FileSystem fileSystem =
-                    HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), getConf());
-                if (fileSystem.exists(path)) {
-                    FileStatus fileStatus = fileSystem.getFileStatus(path);
-                    Set<String> groups = CurrentUser.getGroupNames();
-
-                    if (fileStatus.getOwner().equals(acl.getOwner())
-                            || groups.contains(acl.getGroup())) {
-                        return;
-                    }
-
-                    LOG.error("Permission denied: Either Feed ACL owner {} or group {} doesn't "
-                                    + "match the actual file owner {} or group {} for file {}",
-                            acl, acl.getGroup(), fileStatus.getOwner(), fileStatus.getGroup(), path);
-                    throw new FalconException("Permission denied: Either Feed ACL owner "
-                            + acl + " or group " + acl.getGroup() + " doesn't match the actual "
-                            + "file owner " + fileStatus.getOwner() + " or group "
-                            + fileStatus.getGroup() + "  for file " + path);
-                }
-            }
-        } catch (IOException e) {
-            LOG.error("Can't validate ACL on storage {}", getStorageUrl(), e);
-            throw new RuntimeException("Can't validate storage ACL (URI " + getStorageUrl() + ")", e);
-        }
-    }
-
-    @Override
-    public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException {
-        TimeZone tz = TimeZone.getTimeZone(timeZone);
-        try{
-            for (Location location : getLocations()) {
-                fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, tz, logFilePath);
-            }
-            EvictedInstanceSerDe.serializeEvictedInstancePaths(
-                    HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), getConf()),
-                    logFilePath, instancePaths);
-        }catch (IOException e){
-            throw new FalconException("Couldn't evict feed from fileSystem", e);
-        }catch (ELException e){
-            throw new FalconException("Couldn't evict feed from fileSystem", e);
-        }
-
-        return instanceDates;
-    }
-
-    private void fileSystemEvictor(String feedPath, String retentionLimit, TimeZone timeZone,
-                                   Path logFilePath) throws IOException, ELException, FalconException {
-        Path normalizedPath = new Path(feedPath);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri());
-        feedPath = normalizedPath.toUri().getPath();
-        LOG.info("Normalized path: {}", feedPath);
-
-        Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
-
-        List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, range.first, fs);
-        if (toBeDeleted.isEmpty()) {
-            LOG.info("No instances to delete.");
-            return;
-        }
-
-        DateFormat dateFormat = new SimpleDateFormat(FeedHelper.FORMAT);
-        dateFormat.setTimeZone(timeZone);
-        Path feedBasePath = fs.makeQualified(FeedHelper.getFeedBasePath(feedPath));
-        for (Path path : toBeDeleted) {
-            deleteInstance(fs, path, feedBasePath);
-            Date date = FeedHelper.getDate(feedPath, new Path(path.toUri().getPath()), timeZone);
-            instanceDates.append(dateFormat.format(date)).append(',');
-            instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
-        }
-    }
-
-    private List<Path> discoverInstanceToDelete(String inPath, TimeZone timeZone, Date start, FileSystem fs)
-        throws IOException {
-        FileStatus[] files = findFilesForFeed(fs, inPath);
-        if (files == null || files.length == 0) {
-            return Collections.emptyList();
-        }
-
-        List<Path> toBeDeleted = new ArrayList<Path>();
-        for (FileStatus file : files) {
-            Date date = FeedHelper.getDate(inPath, new Path(file.getPath().toUri().getPath()), timeZone);
-            LOG.debug("Considering {}", file.getPath().toUri().getPath());
-            LOG.debug("Date: {}", date);
-            if (date != null && !isDateInRange(date, start)) {
-                toBeDeleted.add(file.getPath());
-            }
-        }
-        return toBeDeleted;
-    }
-
-    private FileStatus[] findFilesForFeed(FileSystem fs, String feedBasePath) throws IOException {
-        Matcher matcher = FeedDataPath.PATTERN.matcher(feedBasePath);
-        while (matcher.find()) {
-            String var = feedBasePath.substring(matcher.start(), matcher.end());
-            feedBasePath = feedBasePath.replaceAll(Pattern.quote(var), "*");
-            matcher = FeedDataPath.PATTERN.matcher(feedBasePath);
-        }
-        LOG.info("Searching for {}", feedBasePath);
-        return fs.globStatus(new Path(feedBasePath));
-    }
-
-    private boolean isDateInRange(Date date, Date start) {
-        //ignore end ( && date.compareTo(end) <= 0 )
-        return date.compareTo(start) >= 0;
-    }
-
-    private void deleteInstance(FileSystem fs, Path path, Path feedBasePath) throws IOException {
-        if (fs.delete(path, true)) {
-            LOG.info("Deleted instance: {}", path);
-        }else{
-            throw new IOException("Unable to delete instance: " + path);
-        }
-        deleteParentIfEmpty(fs, path.getParent(), feedBasePath);
-    }
-
-    private void deleteParentIfEmpty(FileSystem fs, Path parent, Path feedBasePath) throws IOException {
-        if (feedBasePath.equals(parent)) {
-            LOG.info("Not deleting feed base path: {}", parent);
-        } else {
-            FileStatus[] files = fs.listStatus(parent);
-            if (files != null && files.length == 0) {
-                LOG.info("Parent path: {} is empty, deleting path", parent);
-                if (fs.delete(parent, true)) {
-                    LOG.info("Deleted empty dir: {}", parent);
-                } else {
-                    throw new IOException("Unable to delete parent path:" + parent);
-                }
-                deleteParentIfEmpty(fs, parent.getParent(), feedBasePath);
-            }
-        }
-    }
-
-    @Override
-    @SuppressWarnings("MagicConstant")
-    public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType,
-                                               Date start, Date end) throws FalconException {
-
-        Calendar calendar = Calendar.getInstance();
-        List<Location> clusterSpecificLocation = FeedHelper.
-                getLocations(FeedHelper.getCluster(feed, clusterName), feed);
-        Location location = getLocation(clusterSpecificLocation, locationType);
-        try {
-            FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf());
-            Cluster cluster = ClusterHelper.getCluster(clusterName);
-            Properties baseProperties = FeedHelper.getClusterProperties(cluster);
-            baseProperties.putAll(FeedHelper.getFeedProperties(feed));
-            List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
-            Date feedStart = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
-            TimeZone tz = feed.getTimezone();
-            Date alignedStart = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(), tz, start);
-
-            String basePath = location.getPath();
-            while (!end.before(alignedStart)) {
-                Properties allProperties = ExpressionHelper.getTimeVariables(alignedStart, tz);
-                allProperties.putAll(baseProperties);
-                String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties);
-                FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath));
-                FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath);
-
-                Date date = FeedHelper.getDate(basePath, new Path(feedInstancePath), tz);
-                instance.setInstance(SchemaHelper.formatDateUTC(date));
-                if (fileStatus != null) {
-                    instance.setCreationTime(fileStatus.getModificationTime());
-                    ContentSummary contentSummary = fileSystem.getContentSummary(fileStatus.getPath());
-                    if (contentSummary != null) {
-                        long size = contentSummary.getSpaceConsumed();
-                        instance.setSize(size);
-                        if (!StringUtils.isEmpty(feed.getAvailabilityFlag())) {
-                            FileStatus doneFile = getFileStatus(fileSystem,
-                                    new Path(fileStatus.getPath(), feed.getAvailabilityFlag()));
-                            if (doneFile != null) {
-                                instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
-                            } else {
-                                instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
-                            }
-                        } else {
-                            instance.setStatus(size > 0 ? FeedInstanceStatus.AvailabilityStatus.AVAILABLE
-                                    : FeedInstanceStatus.AvailabilityStatus.EMPTY);
-                        }
-                    }
-                }
-                instances.add(instance);
-                calendar.setTime(alignedStart);
-                calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(),
-                        feed.getFrequency().getFrequencyAsInt());
-                alignedStart = calendar.getTime();
-            }
-            return instances;
-        } catch (IOException e) {
-            LOG.error("Unable to retrieve listing for {}:{}", locationType, getStorageUrl(), e);
-            throw new FalconException("Unable to retrieve listing for (URI " + getStorageUrl() + ")", e);
-        }
-    }
-
-    @Override
-    public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName,
-                                                                   LocationType locationType,
-                                                                   Date instanceTime) throws FalconException {
-
-        List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
-        if (result.isEmpty()) {
-            return FeedInstanceStatus.AvailabilityStatus.MISSING;
-        } else {
-            return result.get(0).getStatus();
-        }
-    }
-
-    public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) {
-        FileStatus fileStatus = null;
-        try {
-            fileStatus = fileSystem.getFileStatus(feedInstancePath);
-        } catch (IOException ignore) {
-            //ignore
-        }
-        return fileStatus;
-    }
-
-    public Configuration getConf() {
-        Configuration conf = new Configuration();
-        conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
-        return conf;
-    }
-
-    private String getRelativePath(Location location) {
-        // if the path contains variables, locate on the "parent" path (just before first variable usage)
-        Matcher matcher = FeedDataPath.PATTERN.matcher(location.getPath());
-        boolean timedPath = matcher.find();
-        if (timedPath) {
-            return location.getPath().substring(0, matcher.start());
-        }
-        return location.getPath();
-    }
-
-    @Override
-    public String toString() {
-        return "FileSystemStorage{"
-                + "storageUrl='" + storageUrl + '\''
-                + ", locations=" + locations
-                + '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
deleted file mode 100644
index f4029e4..0000000
--- a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.security.SecurityUtil;
-
-import java.util.Properties;
-
-/**
- * Hive Utilities.
- */
-public final class HiveUtil {
-    public static final String METASTOREURIS = "hive.metastore.uris";
-    public static final String METASTROE_URI = "hcat.metastore.uri";
-    public static final String NODE = "hcatNode";
-    public static final String METASTORE_UGI = "hive.metastore.execute.setugi";
-
-    private HiveUtil() {
-
-    }
-
-    public static Properties getHiveCredentials(Cluster cluster) {
-        String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
-        if (StringUtils.isBlank(metaStoreUrl)) {
-            throw new IllegalStateException(
-                    "Registry interface is not defined in cluster: " + cluster.getName());
-        }
-
-        Properties hiveCredentials = new Properties();
-        hiveCredentials.put(METASTOREURIS, metaStoreUrl);
-        hiveCredentials.put(METASTORE_UGI, "true");
-        hiveCredentials.put(NODE, metaStoreUrl.replace("thrift", "hcat"));
-        hiveCredentials.put(METASTROE_URI, metaStoreUrl);
-
-        if (SecurityUtil.isSecurityEnabled()) {
-            String principal = ClusterHelper
-                    .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
-            hiveCredentials.put(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL, principal);
-            hiveCredentials.put(SecurityUtil.METASTORE_PRINCIPAL, principal);
-            hiveCredentials.put(SecurityUtil.METASTORE_USE_THRIFT_SASL, "true");
-        }
-        return hiveCredentials;
-    }
-}


Mime
View raw message