falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [1/4] git commit: Revert "FALCON-87 Hive table integration with feed entity. Contributed by Venkatesh Seetharam"
Date Mon, 02 Sep 2013 03:15:44 GMT
Updated Branches:
  refs/heads/master 1e6ace432 -> d422dba48


Revert "FALCON-87 Hive table integration with feed entity. Contributed by Venkatesh Seetharam"

This reverts commit 1e6ace4327910250f60e6d1e41ce5b1726bf4a4f.


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c82b663f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c82b663f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c82b663f

Branch: refs/heads/master
Commit: c82b663fcb74fe00b6ff7572e8c2735d9f878b7c
Parents: 1e6ace4
Author: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Authored: Mon Sep 2 07:55:54 2013 +0530
Committer: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Committed: Mon Sep 2 07:55:54 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 -
 client/src/main/resources/feed-0.1.xsd          |  22 +--
 .../falcon/catalog/AbstractCatalogService.java  |   4 -
 .../falcon/catalog/HiveCatalogService.java      |  32 +--
 .../apache/falcon/entity/CatalogStorage.java    | 194 -------------------
 .../org/apache/falcon/entity/FeedHelper.java    | 101 ++--------
 .../apache/falcon/entity/FileSystemStorage.java | 124 ------------
 .../java/org/apache/falcon/entity/Storage.java  |  74 -------
 .../falcon/entity/parser/FeedEntityParser.java  |  45 +----
 .../java/org/apache/falcon/group/FeedGroup.java |   6 +-
 .../org/apache/falcon/group/FeedGroupMap.java   |   6 +-
 .../org/apache/falcon/update/UpdateHelper.java  |  34 ++--
 .../falcon/entity/CatalogStorageTest.java       | 129 ------------
 .../falcon/entity/FileSystemStorageTest.java    | 133 -------------
 .../entity/parser/FeedEntityParserTest.java     |  25 +--
 .../apache/falcon/update/UpdateHelperTest.java  |  24 +--
 .../resources/config/feed/hive-table-feed.xml   |  48 -----
 .../test/resources/config/feed/invalid-feed.xml |  53 -----
 .../falcon/converter/OozieFeedMapper.java       |  70 ++++---
 .../falcon/converter/OozieFeedMapperTest.java   |  32 +--
 .../falcon/converter/OozieProcessMapper.java    |   6 +-
 .../workflow/OozieProcessWorkflowBuilder.java   |   3 +-
 .../converter/OozieProcessMapperTest.java       |  10 +-
 .../org/apache/falcon/resource/TestContext.java |   8 +-
 24 files changed, 123 insertions(+), 1063 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 046fc27..b2dc1d3 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,9 +5,6 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
-    FALCON-87 Hive table integration with feed entity. (Venkatesh
-    Seetharam via Srikanth Sundarrajan)
-
     FALCON-86 Hive table integration with cluster entity. (Venaktesh 
     Seetharam via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 00b5172..bf6fa81 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -105,10 +105,7 @@
             </xs:element>
             <xs:element type="late-arrival" name="late-arrival" minOccurs="0"/>
             <xs:element type="clusters" name="clusters"/>
-            <xs:choice minOccurs="1" maxOccurs="1">
-                <xs:element type="locations" name="locations"/>
-                <xs:element type="catalog-table" name="table"/>
-            </xs:choice>
+            <xs:element type="locations" name="locations"/>
             <xs:element type="ACL" name="ACL"/>
             <xs:element type="schema" name="schema"/>
             <xs:element type="properties" name="properties" minOccurs="0"/>
@@ -142,10 +139,7 @@
         <xs:sequence>
             <xs:element type="validity" name="validity"/>
             <xs:element type="retention" name="retention"/>
-            <xs:choice minOccurs="0" maxOccurs="1">
-                <xs:element type="locations" name="locations" minOccurs="0"/>
-                <xs:element type="catalog-table" name="table"/>
-            </xs:choice>
+            <xs:element type="locations" name="locations" minOccurs="0"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="cluster-type" name="type" use="optional"/>
@@ -200,7 +194,7 @@
     <xs:complexType name="locations">
         <xs:annotation>
             <xs:documentation>
-                A list of locations on the file system.
+                A list of locations.
             </xs:documentation>
         </xs:annotation>
         <xs:choice maxOccurs="unbounded" minOccurs="0">
@@ -362,14 +356,4 @@
             <xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
         </xs:restriction>
     </xs:simpleType>
-    <xs:complexType name="catalog-table">
-        <xs:annotation>
-            <xs:documentation>
-                catalog specifies the uri of a Hive table along with the partition spec.
-                uri="catalog:$database:$table#(partition-key=partition-value);+"
-                Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY}
-            </xs:documentation>
-        </xs:annotation>
-        <xs:attribute type="xs:string" name="uri" use="required"/>
-    </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index a4b6cfd..4086611 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -23,7 +23,6 @@ import org.apache.falcon.FalconException;
 /**
  * Interface definition for a catalog registry service
  * such as Hive or HCatalog.
- * Catalog should minimally support the following operations.
  */
 public abstract class AbstractCatalogService {
 
@@ -35,7 +34,4 @@ public abstract class AbstractCatalogService {
      * @throws FalconException exception
      */
     public abstract boolean isAlive(String catalogBaseUrl) throws FalconException;
-
-    public abstract boolean tableExists(String catalogUrl, String database, String tableName)
-        throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 67d14af..e6f7fe2 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -22,7 +22,6 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.security.CurrentUser;
 import org.apache.log4j.Logger;
 
 import javax.ws.rs.core.MediaType;
@@ -41,36 +40,7 @@ public class HiveCatalogService extends AbstractCatalogService {
 
         Client client = Client.create();
         WebResource service = client.resource(catalogBaseUrl);
-        ClientResponse response = service.path("status")
-                .accept(MediaType.APPLICATION_JSON)
-                .head();
-                // .get(ClientResponse.class);    // todo this isnt working
-
-        if (LOG.isDebugEnabled() && response.getStatus() != 200) {
-            LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
-        }
-
-        return response.getStatus() == 200;
-    }
-
-    @Override
-    public boolean tableExists(String catalogUrl, String database, String tableName)
-        throws FalconException {
-        LOG.info("Checking if the table exists: " + tableName);
-
-        Client client = Client.create();
-        WebResource service = client.resource(catalogUrl);
-
-        ClientResponse response = service.path("ddl/database/").path(database)
-                .path("/table").path(tableName)
-                .queryParam("user.name", CurrentUser.getUser())
-                .accept(MediaType.APPLICATION_JSON)
-                .get(ClientResponse.class);
-
-        if (LOG.isDebugEnabled() && response.getStatus() != 200) {
-            LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
-        }
-
+        ClientResponse response = service.path("status").accept(MediaType.APPLICATION_JSON).head();
         return response.getStatus() == 200;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
deleted file mode 100644
index 5c64f27..0000000
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A catalog registry implementation of a feed storage.
- */
-public class CatalogStorage implements Storage {
-
-    public static final String PARTITION_SEPARATOR = ";";
-    public static final String PARTITION_KEYVAL_SEPARATOR = "=";
-    public static final String INPUT_PATH_SEPARATOR = ":";
-    public static final String OUTPUT_PATH_SEPARATOR = "/";
-
-    private final String catalogUrl;
-    private String database;
-    private String table;
-    private Map<String, String> partitions;
-
-    protected CatalogStorage(String catalogTable) throws URISyntaxException {
-        this("${hcatNode}", catalogTable);
-    }
-
-    protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
-        if (catalogUrl == null || catalogUrl.length() == 0) {
-            throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
-        }
-
-        this.catalogUrl = catalogUrl;
-
-        parse(tableUri);
-    }
-
-    /**
-     * Validate URI to conform to catalog:$database:$table#$partitions.
-     * scheme=catalog:database=$database:table=$table#$partitions
-     * partitions=key=value;key=value
-     *
-     * @param catalogTableUri table URI to parse and validate
-     * @throws URISyntaxException
-     */
-    private void parse(String catalogTableUri) throws URISyntaxException {
-
-        URI tableUri = new URI(catalogTableUri);
-
-        if (!"catalog".equals(tableUri.getScheme())) {
-            throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
-        }
-
-        final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
-        if (schemeSpecificPart == null) {
-            throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
-        }
-
-        String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
-
-        if (paths.length != 2) {
-            throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
-        }
-
-        database = paths[0];
-        table = paths[1];
-
-        if (database == null || database.length() == 0) {
-            throw new URISyntaxException(tableUri.toString(), "DB name is missing");
-        }
-        if (table == null || table.length() == 0) {
-            throw new URISyntaxException(tableUri.toString(), "Table name is missing");
-        }
-
-        String partRaw = tableUri.getFragment();
-        if (partRaw == null || partRaw.length() == 0) {
-            throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
-        }
-
-        partitions = new HashMap<String, String>();
-        String[] parts = partRaw.split(PARTITION_SEPARATOR);
-        for (String part : parts) {
-            if (part == null || part.length() == 0) {
-                continue;
-            }
-
-            String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
-            if (keyVal.length != 2) {
-                throw new URISyntaxException(tableUri.toString(),
-                        "Partition key value pair is not specified properly in (" + part + ")");
-            }
-
-            partitions.put(keyVal[0], keyVal[1]);
-        }
-    }
-
-    public String getCatalogUrl() {
-        return catalogUrl;
-    }
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public Map<String, String> getPartitions() {
-        return partitions;
-    }
-
-    /**
-     * @param key partition key
-     * @return partition value
-     */
-    public String getPartitionValue(String key) {
-        return partitions.get(key);
-    }
-
-    /**
-     * @param key partition key
-     * @return if partitions map includes the key or not
-     */
-    public boolean hasPartition(String key) {
-        return partitions.containsKey(key);
-    }
-
-    @Override
-    public TYPE getType() {
-        return TYPE.TABLE;
-    }
-
-    @Override
-    public String getUriTemplate() {
-        return getUriTemplate(LocationType.DATA);
-    }
-
-    @Override
-    public String getUriTemplate(LocationType locationType) {
-        StringBuilder uriTemplate = new StringBuilder();
-        uriTemplate.append(catalogUrl);
-        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
-        uriTemplate.append(database);
-        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
-        uriTemplate.append(table);
-        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
-        for (Map.Entry<String, String> entry : partitions.entrySet()) {
-            uriTemplate.append(entry.getKey());
-            uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
-            uriTemplate.append(entry.getValue());
-            uriTemplate.append(PARTITION_SEPARATOR);
-        }
-        uriTemplate.setLength(uriTemplate.length() - 1);
-
-        return uriTemplate.toString();
-    }
-
-    @Override
-    public boolean exists() throws FalconException {
-        return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
-    }
-
-    @Override
-    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
-        CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
-
-        return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
-                && getDatabase().equals(catalogStorage.getDatabase())
-                && getTable().equals(catalogStorage.getTable())
-                && getPartitions().equals(catalogStorage.getPartitions());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index d212a98..c96120d 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,19 +20,11 @@ package org.apache.falcon.entity;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.*;
 import org.apache.falcon.expression.ExpressionHelper;
 
-import java.net.URISyntaxException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -52,87 +44,32 @@ public final class FeedHelper {
         return null;
     }
 
-    public static Storage createStorage(Feed feed) throws FalconException {
-
-        final List<Location> locations = feed.getLocations().getLocations();
-        if (locations != null) {
-            return new FileSystemStorage(locations);
-        }
-
-        try {
-            final CatalogTable table = feed.getTable();
-            if (table != null) {
-                return new CatalogStorage(table.getUri());
-            }
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
+    public static Location getLocation(Feed feed, LocationType type,
+                                       String clusterName) {
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster != null && cluster.getLocations() != null
+                && cluster.getLocations().getLocations().size() != 0) {
+            return getLocation(cluster.getLocations(), type);
+        } else {
+            return getLocation(feed.getLocations(), type);
         }
 
-        throw new FalconException("Both catalog and locations are not defined.");
     }
 
-    public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-                                        Feed feed) throws FalconException {
-        return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
-    }
-
-    public static Storage createStorage(String clusterName, Feed feed)
-        throws FalconException {
-
-        return createStorage(getCluster(feed, clusterName), feed);
-    }
-
-    public static Storage createStorage(Cluster cluster, Feed feed)
-        throws FalconException {
-
-        final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
-
-        return createStorage(cluster, feed, clusterEntity);
+    public static Location getLocation(Feed feed, LocationType type) {
+        return getLocation(feed.getLocations(), type);
     }
 
-    public static Storage createStorage(Cluster cluster, Feed feed,
-                                        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
-        throws FalconException {
-
-        final List<Location> locations = getLocations(cluster, feed);
-        if (locations != null) {
-            return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
-        }
-
-        try {
-            final CatalogTable table = getTable(cluster, feed);
-            if (table != null) {
-                return new CatalogStorage(
-                        ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint(),
-                        table.getUri());
+    public static Location getLocation(Locations locations, LocationType type) {
+        for (Location loc : locations.getLocations()) {
+            if (loc.getType() == type) {
+                return loc;
             }
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
         }
-
-        throw new FalconException("Both catalog and locations are not defined.");
-    }
-
-    private static List<Location> getLocations(Cluster cluster, Feed feed) {
-        // check if locations are overridden in cluster
-        final Locations clusterLocations = cluster.getLocations();
-        if (clusterLocations != null
-                && clusterLocations.getLocations().size() != 0) {
-            return clusterLocations.getLocations();
-        }
-
-        final Locations feedLocations = feed.getLocations();
-        return feedLocations == null ? null : feedLocations.getLocations();
-    }
-
-    private static CatalogTable getTable(Cluster cluster, Feed feed) {
-        // check if table is overridden in cluster
-        if (cluster.getTable() != null) {
-            return cluster.getTable();
-        }
-
-        return feed.getTable();
+        Location loc = new Location();
+        loc.setPath("/tmp");
+        loc.setType(type);
+        return loc;
     }
 
     public static String normalizePartitionExpression(String part1, String part2) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
deleted file mode 100644
index 95b18c5..0000000
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-import java.util.List;
-
-/**
- * A file system implementation of a feed storage.
- */
-public class FileSystemStorage implements Storage {
-
-    private final String storageUrl;
-    private final List<Location> locations;
-
-    protected FileSystemStorage(List<Location> locations) {
-        this("${nameNode}", locations);
-    }
-
-    protected FileSystemStorage(String storageUrl, List<Location> locations) {
-        if (storageUrl == null || storageUrl.length() == 0) {
-            throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
-        }
-
-        if (locations == null || locations.size() == 0) {
-            throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
-        }
-
-        this.storageUrl = storageUrl;
-        this.locations = locations;
-    }
-
-    @Override
-    public TYPE getType() {
-        return TYPE.FILESYSTEM;
-    }
-
-    public String getStorageUrl() {
-        return storageUrl;
-    }
-
-    public List<Location> getLocations() {
-        return locations;
-    }
-
-    @Override
-    public String getUriTemplate() {
-        return getUriTemplate(LocationType.DATA);
-    }
-
-    @Override
-    public String getUriTemplate(LocationType locationType) {
-        Location locationForType = null;
-        for (Location location : locations) {
-            if (location.getType() == locationType) {
-                locationForType = location;
-                break;
-            }
-        }
-
-        if (locationForType == null) {
-            return "/tmp";
-        }
-
-        StringBuilder uriTemplate = new StringBuilder();
-        uriTemplate.append(storageUrl);
-        uriTemplate.append(locationForType.getPath());
-        return uriTemplate.toString();
-    }
-
-    @Override
-    public boolean exists() throws FalconException {
-        // Directories on FS will be created if they don't exist.
-        return true;
-    }
-
-    @Override
-    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
-        FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
-        final List<Location> fsStorageLocations = fsStorage.getLocations();
-
-        return getLocations().size() == fsStorageLocations.size()
-               && getLocation(getLocations(), LocationType.DATA).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.DATA).getPath())
-               && getLocation(getLocations(), LocationType.META).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.META).getPath())
-               && getLocation(getLocations(), LocationType.STATS).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.STATS).getPath())
-               && getLocation(getLocations(), LocationType.TMP).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.TMP).getPath());
-    }
-
-    private static Location getLocation(List<Location> locations, LocationType type) {
-        for (Location loc : locations) {
-            if (loc.getType() == type) {
-                return loc;
-            }
-        }
-
-        Location loc = new Location();
-        loc.setPath("/tmp");
-        loc.setType(type);
-        return loc;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
deleted file mode 100644
index 18dad65..0000000
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-/**
- * A class to encapsulate the storage for a given feed which can either be
- * expressed as a path on the file system or a table in a catalog.
- */
-public interface Storage {
-
-    /**
-     * Enumeration for the various storage types.
-     */
-    enum TYPE {FILESYSTEM, TABLE}
-
-    /**
-     * Return the type of storage.
-     *
-     * @return storage type
-     */
-    TYPE getType();
-
-    /**
-     * Return the uri template.
-     *
-     * @return uri template
-     */
-    String getUriTemplate();
-
-    /**
-     * Return the uri template for a given location type.
-     *
-     * @param locationType type of location, applies only to filesystem type
-     * @return uri template
-     */
-    String getUriTemplate(LocationType locationType);
-
-    /**
-     * Check if the storage, filesystem location or catalog table exists.
-     * Filesystem location always returns true.
-     *
-     * @return true if table exists else false
-     * @throws FalconException an exception
-     */
-    boolean exists() throws FalconException;
-
-    /**
-     * Check for equality of this instance against the one in question.
-     *
-     * @param toCompareAgainst instance to compare
-     * @return true if identical else false
-     * @throws FalconException an exception
-     */
-    boolean isIdentical(Storage toCompareAgainst) throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index d0435fb..1c323fd 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,10 +20,8 @@ package org.apache.falcon.entity.parser;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
@@ -31,6 +29,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
@@ -72,7 +71,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
             validateFeedCutOffPeriod(feed, cluster);
         }
 
-        validateFeedStorage(feed);
         validateFeedPartitionExpression(feed);
         validateFeedGroups(feed);
 
@@ -107,19 +105,21 @@ public class FeedEntityParser extends EntityParser<Feed> {
         return processes;
     }
 
-    private void validateFeedGroups(Feed feed) throws FalconException {
+    private void validateFeedGroups(Feed feed) throws ValidationException {
         String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
-        String defaultPath = FeedHelper.createStorage(feed).getUriTemplate();
+        String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
+                .getPath();
         for (Cluster cluster : feed.getClusters().getClusters()) {
-            final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate();
-            if (!FeedGroup.getDatePattern(uriTemplate).equals(
+            if (!FeedGroup.getDatePattern(
+                    FeedHelper.getLocation(feed, LocationType.DATA,
+                            cluster.getName()).getPath()).equals(
                     FeedGroup.getDatePattern(defaultPath))) {
                 throw new ValidationException("Feeds default path pattern: "
-                        + FeedHelper.createStorage(feed).getUriTemplate()
+                        + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
                         + ", does not match with cluster: "
                         + cluster.getName()
                         + " path pattern: "
-                        + uriTemplate);
+                        + FeedHelper.getLocation(feed, LocationType.DATA, cluster.getName()).getPath());
             }
         }
         for (String groupName : groupNames) {
@@ -127,7 +127,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             if (group != null && !group.canContainFeed(feed)) {
                 throw new ValidationException(
                         "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
-                                + ", path pattern: " + FeedHelper.createStorage(feed)
+                                + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
                                 + " does not match with group: " + group.getName() + "'s frequency: "
                                 + group.getFrequency()
                                 + ", date pattern: " + group.getDatePattern());
@@ -280,29 +280,4 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     "Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
         }
     }
-
-    /**
-     * Ensure table is already defined in the catalog registry.
-     * Does not matter for FileSystem storage.
-     */
-    private void validateFeedStorage(Feed feed) throws FalconException {
-        StringBuilder buffer = new StringBuilder();
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            final Storage storage = FeedHelper.createStorage(cluster, feed);
-            if (!storage.exists()) {
-                // this is only true for table, filesystem always returns true
-                CatalogStorage catalogStorage = (CatalogStorage) storage;
-                buffer.append("Table [")
-                        .append(catalogStorage.getTable())
-                        .append("] does not exist for feed: ")
-                        .append(feed.getName())
-                        .append(", cluster: ")
-                        .append(cluster.getName());
-            }
-        }
-
-        if (buffer.length() > 0) {
-            throw new ValidationException(buffer.toString());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index d517828..5dca46f 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -17,10 +17,10 @@
  */
 package org.apache.falcon.group;
 
-import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -93,8 +93,8 @@ public class FeedGroup {
         return datePattern;
     }
 
-    public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
+    public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
         return this.frequency.equals(feed.getFrequency())
-                && this.datePattern.equals(getDatePattern(FeedHelper.createStorage(feed).getUriTemplate()));
+                && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index d054873..ed44b48 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.service.ConfigurationChangeListener;
 
 import java.util.Collections;
@@ -113,9 +114,8 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
         return groupSet;
     }
 
-    public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed)
-        throws FalconException {
+    public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
         return getGroups(feed.getGroups(), feed.getFrequency(),
-                FeedHelper.createStorage(feed).getUriTemplate());
+                FeedHelper.getLocation(feed, LocationType.DATA).getPath());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index fc69933..a9d39de 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -21,10 +21,10 @@ package org.apache.falcon.update;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Partitions;
 import org.apache.falcon.entity.v0.process.Cluster;
@@ -78,15 +78,18 @@ public final class UpdateHelper {
         }
     }
 
-    public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess)
-        throws FalconException {
-        Storage oldFeedStorage = FeedHelper.createStorage(oldFeed);
-        Storage newFeedStorage = FeedHelper.createStorage(newFeed);
-
-        if (!oldFeedStorage.isIdentical(newFeedStorage)) {
+    public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
+        if (!FeedHelper.getLocation(oldFeed.getLocations(), LocationType.DATA)
+            .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.DATA).getPath())
+                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.META)
+                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.META).getPath())
+                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.STATS)
+                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.STATS).getPath())
+                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.TMP)
+                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.TMP).getPath())) {
             return true;
         }
-        LOG.debug(oldFeed.toShortString() + ": Storage identical. Ignoring...");
+        LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
 
         if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
             return true;
@@ -125,12 +128,17 @@ public final class UpdateHelper {
         }
 
         for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
-            oldFeedStorage = FeedHelper.createStorage(cluster.getName(), oldFeed);
-            newFeedStorage = FeedHelper.createStorage(cluster.getName(), newFeed);
-
-            if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
+            if (!FeedHelper
+                    .getCluster(oldFeed, cluster.getName()).getValidity().getStart()
                     .equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart())
-                    || !oldFeedStorage.isIdentical(newFeedStorage)) {
+                    || !FeedHelper.getLocation(oldFeed, LocationType.DATA, cluster.getName()).getPath()
+                    .equals(FeedHelper.getLocation(newFeed, LocationType.DATA, cluster.getName()).getPath())
+                    || !FeedHelper.getLocation(oldFeed, LocationType.META, cluster.getName()).getPath()
+                    .equals(FeedHelper.getLocation(newFeed, LocationType.META, cluster.getName()).getPath())
+                    || !FeedHelper.getLocation(oldFeed, LocationType.STATS, cluster.getName()).getPath()
+                    .equals(FeedHelper.getLocation(newFeed, LocationType.STATS, cluster.getName()).getPath())
+                    || !FeedHelper.getLocation(oldFeed, LocationType.TMP, cluster.getName()).getPath()
+                    .equals(FeedHelper.getLocation(newFeed, LocationType.TMP, cluster.getName()).getPath())) {
                 return true;
             }
             LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
deleted file mode 100644
index 458111d..0000000
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.net.URISyntaxException;
-
-/**
- * Test class for Catalog Table Storage.
- * Exists will be covered in integration tests as it actually checks if the table exists.
- */
-public class CatalogStorageTest {
-
-    @Test
-    public void testGetType() throws Exception {
-        String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        CatalogStorage storage = new CatalogStorage(table);
-        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
-    }
-
-    @Test
-    public void testParseVaildURI() throws URISyntaxException {
-        String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        CatalogStorage storage = new CatalogStorage(table);
-        Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
-        Assert.assertEquals("clicksdb", storage.getDatabase());
-        Assert.assertEquals("clicks", storage.getTable());
-        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
-        Assert.assertEquals(2, storage.getPartitions().size());
-        Assert.assertEquals("us", storage.getPartitionValue("region"));
-        Assert.assertTrue(storage.hasPartition("region"));
-        Assert.assertNull(storage.getPartitionValue("unknown"));
-        Assert.assertFalse(storage.hasPartition("unknown"));
-    }
-
-
-    @DataProvider(name = "invalidTableURIs")
-    public Object[][] createInvalidTableURIData() {
-        return new Object[][] {
-            {"catalog:default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
-            {"default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
-            {"catalog:default#ds=$YEAR-$MONTH-$DAY;region=us", ""},
-            {"catalog://default/clicks#ds=$YEAR-$MONTH-$DAY:region=us", ""},
-        };
-    }
-
-    @Test(dataProvider = "invalidTableURIs", expectedExceptions = URISyntaxException.class)
-    public void testParseInvalidURI(String tableUri, String ignore) throws URISyntaxException {
-        new CatalogStorage(tableUri);
-        Assert.fail("Exception must have been thrown");
-    }
-
-    @Test
-    public void testIsIdenticalPositive() throws Exception {
-        CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        Assert.assertTrue(table1.isIdentical(table2));
-
-        final String catalogUrl = "thrift://localhost:49083";
-        CatalogStorage table3 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        CatalogStorage table4 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        Assert.assertTrue(table3.isIdentical(table4));
-    }
-
-    @Test
-    public void testIsIdenticalNegative() throws Exception {
-        CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
-        Assert.assertFalse(table1.isIdentical(table2));
-
-        final String catalogUrl = "thrift://localhost:49083";
-        CatalogStorage table3 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        CatalogStorage table4 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
-        Assert.assertFalse(table3.isIdentical(table4));
-
-        CatalogStorage table5 = new CatalogStorage("thrift://localhost:49084",
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        CatalogStorage table6 = new CatalogStorage("thrift://localhost:49083",
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        Assert.assertFalse(table5.isIdentical(table6));
-    }
-
-    @Test
-    public void testGetUriTemplateWithCatalogUrl() throws Exception {
-        final String catalogUrl = "thrift://localhost:49083";
-        String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
-
-        CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
-
-        Assert.assertEquals(uriTemplate, table.getUriTemplate());
-        Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
-    }
-
-    @Test
-    public void testGetUriTemplateWithOutCatalogUrl() throws Exception {
-        String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
-
-        CatalogStorage table = new CatalogStorage(tableUri);
-
-        Assert.assertEquals(uriTemplate, table.getUriTemplate());
-        Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
deleted file mode 100644
index 3e3b575..0000000
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Test class for File System Storage.
- */
-public class FileSystemStorageTest {
-
-    @Test
-    public void testGetType() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage(locations);
-        Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
-    }
-
-    @Test
-    public void testGetUriTemplate() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getUriTemplate(), "hdfs://localhost:41020/foo/bar");
-    }
-
-    @Test
-    public void testGetUriTemplateWithOutStorageURL() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage(locations);
-        Assert.assertEquals(storage.getUriTemplate(), "${nameNode}/foo/bar");
-    }
-
-    @Test
-    public void testGetUriTemplateWithLocationType() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
-    }
-
-    @Test
-    public void testExists() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertTrue(storage.exists());
-    }
-
-    @Test
-    public void testIsIdentical() throws Exception {
-        final String storageUrl = "hdfs://localhost:41020";
-        final Location location1 = new Location();
-        location1.setPath("/foo/bar");
-        location1.setType(LocationType.DATA);
-        List<Location> locations1 = new ArrayList<Location>();
-        locations1.add(location1);
-        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
-
-        final Location location2 = new Location();
-        location2.setPath("/foo/bar");
-        location2.setType(LocationType.DATA);
-        List<Location> locations2 = new ArrayList<Location>();
-        locations2.add(location2);
-        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
-
-        Assert.assertTrue(storage1.isIdentical(storage2));
-    }
-
-    @Test
-    public void testIsIdenticalNegative() throws Exception {
-        final String storageUrl = "hdfs://localhost:41020";
-        final Location location1 = new Location();
-        location1.setPath("/foo/baz");
-        location1.setType(LocationType.DATA);
-        List<Location> locations1 = new ArrayList<Location>();
-        locations1.add(location1);
-        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
-
-        final Location location2 = new Location();
-        location2.setPath("/foo/bar");
-        location2.setType(LocationType.DATA);
-        List<Location> locations2 = new ArrayList<Location>();
-        locations2.add(location2);
-        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
-
-        Assert.assertFalse(storage1.isIdentical(storage2));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 8bcff78..4cc4a0e 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -116,12 +116,12 @@ public class FeedEntityParserTest extends AbstractTestBase {
         assertEquals(feed.getClusters().getClusters().get(1).getRetention()
                 .getLimit().toString(), "hours(6)");
 
-        assertEquals("${nameNode}/projects/falcon/clicks",
-                FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
-        assertEquals("${nameNode}/projects/falcon/clicksMetaData",
-                FeedHelper.createStorage(feed).getUriTemplate(LocationType.META));
-        assertEquals("${nameNode}/projects/falcon/clicksStats",
-                FeedHelper.createStorage(feed).getUriTemplate(LocationType.STATS));
+        assertEquals(FeedHelper.getLocation(feed, LocationType.DATA).getPath(),
+                "/projects/falcon/clicks");
+        assertEquals(FeedHelper.getLocation(feed, LocationType.META).getPath(),
+                "/projects/falcon/clicksMetaData");
+        assertEquals(FeedHelper.getLocation(feed, LocationType.STATS).getPath(),
+                "/projects/falcon/clicksStats");
 
         assertEquals(feed.getACL().getGroup(), "group");
         assertEquals(feed.getACL().getOwner(), "testuser");
@@ -444,17 +444,4 @@ public class FeedEntityParserTest extends AbstractTestBase {
             Assert.assertEquals(org.xml.sax.SAXParseException.class, e.getCause().getCause().getClass());
         }
     }
-
-    @Test
-    public void testParseFeedWithTable() throws FalconException {
-        final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
-        Feed feedWithTable = parser.parse(inputStream);
-        Assert.assertEquals(feedWithTable.getTable().getUri(),
-                "catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR");
-    }
-
-    @Test (expectedExceptions = FalconException.class)
-    public void testParseInvalidFeedWithTable() throws FalconException {
-        parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index e8f80bd..d4b5a2a 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -28,9 +28,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Properties;
 import org.apache.falcon.entity.v0.feed.Property;
@@ -119,11 +117,11 @@ public class UpdateHelperTest extends AbstractTestBase {
         newFeed.getLateArrival().setCutOff(oldFeed.getLateArrival().getCutOff());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
-        getLocation(newFeed, LocationType.DATA).setPath("/test");
+        FeedHelper.getLocation(newFeed, LocationType.DATA).setPath("/test");
         Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
-        getLocation(newFeed, LocationType.DATA).setPath(
-                getLocation(oldFeed, LocationType.DATA).getPath());
+        FeedHelper.getLocation(newFeed, LocationType.DATA).setPath(
+                FeedHelper.getLocation(oldFeed, LocationType.DATA).getPath());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
         newFeed.setFrequency(Frequency.fromString("months(1)"));
@@ -156,20 +154,4 @@ public class UpdateHelperTest extends AbstractTestBase {
                         process.getClusters().getClusters().get(0).getName()).getValidity().getStart());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
     }
-
-    private static Location getLocation(Feed feed, LocationType type) {
-        return getLocation(feed.getLocations(), type);
-    }
-
-    private static Location getLocation(Locations locations, LocationType type) {
-        for (Location loc : locations.getLocations()) {
-            if (loc.getType() == type) {
-                return loc;
-            }
-        }
-        Location loc = new Location();
-        loc.setPath("/tmp");
-        loc.setType(type);
-        return loc;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
deleted file mode 100644
index e84f90a..0000000
--- a/common/src/test/resources/config/feed/hive-table-feed.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
-    <partitions>
-        <partition name="fraud"/>
-        <partition name="good"/>
-    </partitions>
-
-    <groups>online,bi</groups>
-
-    <frequency>hours(1)</frequency>
-    <timezone>UTC</timezone>
-    <late-arrival cut-off="hours(6)"/>
-
-    <clusters>
-        <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
-            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
-            <retention limit="hours(48)" action="delete"/>
-            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
-        </cluster>
-        <cluster name="backupCluster" type="target">
-            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
-            <retention limit="hours(6)" action="archive"/>
-            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
-        </cluster>
-    </clusters>
-
-    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
-
-    <ACL owner="testuser" group="group" permission="0x755"/>
-    <schema location="/schema/clicks" provider="protobuf"/>
-</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/resources/config/feed/invalid-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/invalid-feed.xml b/common/src/test/resources/config/feed/invalid-feed.xml
deleted file mode 100644
index b7273a9..0000000
--- a/common/src/test/resources/config/feed/invalid-feed.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
-    <partitions>
-        <partition name="fraud"/>
-        <partition name="good"/>
-    </partitions>
-
-    <groups>online,bi</groups>
-
-    <frequency>hours(1)</frequency>
-    <timezone>UTC</timezone>
-    <late-arrival cut-off="hours(6)"/>
-
-    <clusters>
-        <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
-            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
-            <retention limit="hours(48)" action="delete"/>
-            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
-        </cluster>
-        <cluster name="backupCluster" type="target">
-            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
-            <retention limit="hours(6)" action="archive"/>
-            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
-        </cluster>
-    </clusters>
-
-    <locations>
-        <location type="data" path="/projects/falcon/clicks"/>
-        <location type="stats" path="/projects/falcon/clicksStats"/>
-        <location type="meta" path="/projects/falcon/clicksMetaData"/>
-    </locations>
-    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
-
-    <ACL owner="testuser" group="group" permission="0x755"/>
-    <schema location="/schema/clicks" provider="protobuf"/>
-</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index 00c261b..adef3ec 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -21,9 +21,9 @@ package org.apache.falcon.converter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -62,7 +62,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
     private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
     private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
 
-    public static final String FEED_PATH_SEP = "#";
+    private static final String FEED_PATH_SEP = "#";
     private static final String TIMEOUT = "timeout";
     private static final String PARALLEL = "parallel";
 
@@ -116,7 +116,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         WORKFLOW retentionWorkflow = new WORKFLOW();
         try {
             //
-            WORKFLOWAPP retWfApp = createRetentionWorkflow();
+            WORKFLOWAPP retWfApp = createRetentionWorkflow(cluster);
             retWfApp.setName(wfName);
             marshal(cluster, retWfApp, wfPath);
             retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
@@ -124,9 +124,23 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
 
             org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-            String feedBasePaths = getFeedDataPath(cluster, feed);
+            String feedPathMask = getLocationURI(cluster, feed, LocationType.DATA);
+            String metaPathMask = getLocationURI(cluster, feed, LocationType.META);
+            String statsPathMask = getLocationURI(cluster, feed, LocationType.STATS);
+            String tmpPathMask = getLocationURI(cluster, feed, LocationType.TMP);
+
+            StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
+            if (metaPathMask != null) {
+                feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
+            }
+            if (statsPathMask != null) {
+                feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
+            }
+            if (tmpPathMask != null) {
+                feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
+            }
 
-            props.put("feedDataPath", feedBasePaths);
+            props.put("feedDataPath", feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{"));
             props.put("timeZone", feed.getTimezone().getID());
             props.put("frequency", feed.getFrequency().getTimeUnit().name());
             props.put("limit", feedCluster.getRetention().getLimit().toString());
@@ -142,29 +156,6 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         }
     }
 
-    protected String getFeedDataPath(Cluster cluster, Feed feed) throws FalconException {
-        final Storage storage = FeedHelper.createStorage(cluster, feed);
-        String feedPathMask = storage.getUriTemplate(LocationType.DATA);
-        String metaPathMask = storage.getUriTemplate(LocationType.META);
-        String statsPathMask = storage.getUriTemplate(LocationType.STATS);
-        String tmpPathMask = storage.getUriTemplate(LocationType.TMP);
-
-        StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
-        if (metaPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
-        }
-
-        if (statsPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
-        }
-
-        if (tmpPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
-        }
-
-        return feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{");
-    }
-
     private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
         throws FalconException {
 
@@ -272,8 +263,10 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             SYNCDATASET inputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
             SYNCDATASET outputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
 
-            inputDataset.setUriTemplate(FeedHelper.createStorage(srcCluster, feed).getUriTemplate(LocationType.DATA));
-            outputDataset.setUriTemplate(FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
+            inputDataset.setUriTemplate(new Path(ClusterHelper.getStorageUrl(srcCluster),
+                FeedHelper.getLocation(feed, LocationType.DATA, srcCluster.getName()).getPath()).toString());
+            outputDataset.setUriTemplate(getStoragePath(
+                FeedHelper.getLocation(feed, LocationType.DATA, trgCluster.getName()).getPath()));
             setDatasetValues(inputDataset, feed, srcCluster);
             setDatasetValues(outputDataset, feed, srcCluster);
             if (feed.getAvailabilityFlag() == null) {
@@ -345,7 +338,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         marshal(cluster, repWFapp, wfPath);
     }
 
-    private WORKFLOWAPP createRetentionWorkflow() throws IOException, FalconException {
+    private WORKFLOWAPP createRetentionWorkflow(Cluster cluster) throws IOException, FalconException {
         return getWorkflowTemplate(RETENTION_WF_TEMPLATE);
     }
 
@@ -360,4 +353,19 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         }
         return props;
     }
+
+    private String getLocationURI(Cluster cluster, Feed feed, LocationType type) {
+        String path = FeedHelper.getLocation(feed, type, cluster.getName())
+                .getPath();
+
+        if (!path.equals("/tmp")) {
+            if (new Path(path).toUri().getScheme() == null) {
+                return new Path(ClusterHelper.getStorageUrl(cluster), path)
+                        .toString();
+            } else {
+                return path;
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 3b3132a..7fbe179 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -19,7 +19,6 @@ package org.apache.falcon.converter;
 
 import static org.testng.Assert.assertEquals;
 
-import java.util.Calendar;
 import java.util.Collection;
 import java.util.List;
 
@@ -28,7 +27,6 @@ import javax.xml.bind.Unmarshaller;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -76,6 +74,7 @@ public class OozieFeedMapperTest {
         ClusterHelper.getInterface(trgCluster, Interfacetype.WRITE).setEndpoint(trgHdfsUrl);
 
         feed = (Feed) storeEntity(EntityType.FEED, FEED);
+
     }
 
     protected Entity storeEntity(EntityType type, String path) throws Exception {
@@ -149,35 +148,6 @@ public class OozieFeedMapperTest {
                 break;
             }
         }
-    }
-
-    @Test
-    public void testRetentionCoords() throws FalconException {
-        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
-        final Calendar instance = Calendar.getInstance();
-        instance.roll(Calendar.YEAR, 1);
-        cluster.getValidity().setEnd(instance.getTime());
-
-        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
-        List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
-        COORDINATORAPP coord = coords.get(0);
 
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
-        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
-        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
-
-        String feedDataPath = null;
-        org.apache.falcon.oozie.coordinator.CONFIGURATION configuration =
-                coord.getAction().getWorkflow().getConfiguration();
-        for (Property property : configuration.getProperty()) {
-            if ("feedDataPath".equals(property.getName())) {
-                feedDataPath = property.getValue();
-                break;
-            }
-        }
-
-        if (feedDataPath != null) {
-            Assert.assertEquals(feedDataPath, feedMapper.getFeedDataPath(srcCluster, feed));
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 86ad937..8f75736 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -295,8 +295,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
         SYNCDATASET syncdataset = new SYNCDATASET();
         syncdataset.setName(datasetName);
-        String locPath = FeedHelper.createStorage(cluster, feed).getUriTemplate(locationType);
-        syncdataset.setUriTemplate(locPath);
+        String locPath = FeedHelper.getLocation(feed, locationType,
+                cluster.getName()).getPath();
+        syncdataset.setUriTemplate(new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}"
+                + locPath);
         syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
 
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 128a73d..3f70557 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.security.CurrentUser;
@@ -72,7 +73,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
         properties.put(inName + ".done-flag", "notused");
 
-        String locPath = FeedHelper.createStorage(clusterName, feed).getUriTemplate().replace('$', '%');
+        String locPath = FeedHelper.getLocation(feed, LocationType.DATA, clusterName).getPath().replace('$', '%');
         properties.put(inName + ".uri-template",
                 new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index e3a42ca..7be41da 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -47,6 +47,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -141,14 +142,13 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         ConfigurationStore store = ConfigurationStore.get();
         Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
         SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
-        final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
-        assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
+        assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity().getStart()),
+                ds.getInitialInstance());
         assertEquals(feed.getTimezone().getID(), ds.getTimezone());
         assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
         assertEquals("", ds.getDoneFlag());
-        assertEquals(ds.getUriTemplate(), FeedHelper.createStorage(feedCluster, feed).getUriTemplate());
-
+        assertEquals(ds.getUriTemplate(), "${nameNode}" + FeedHelper.getLocation(feed, LocationType.DATA,
+                feed.getClusters().getClusters().get(0).getName()).getPath());
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             if (prop.getName().equals("mapred.job.priority")) {
                 assertEquals(prop.getValue(), "LOW");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 74bab07..f762c4b 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -295,11 +295,9 @@ public class TestContext {
 
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
 
-        return this.service.path("api/entities/submit/" + entityType.name().toLowerCase())
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML)
-                .type(MediaType.TEXT_XML)
-                .post(ClientResponse.class, rawlogStream);
+        return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User",
+                "testuser")
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream);
     }
 
     public void assertRequestId(ClientResponse clientRepsonse) {


Mime
View raw message