falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [10/12] FALCON-85 Hive (HCatalog) integration. Contributed by Venkatesh Seetharam FALCON-163 Merge FALCON-85 branch into main line. Contributed by Venkatesh Seetharam
Date Tue, 12 Nov 2013 11:05:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
new file mode 100644
index 0000000..972066d
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.net.URISyntaxException;
+
+/**
+ * Test class for Catalog Table Storage.
+ * Exists will be covered in integration tests as it actually checks if the table exists.
+ */
+public class CatalogStorageTest {
+
+    @Test
+    public void testGetType() throws Exception {
+        String table = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        CatalogStorage storage = new CatalogStorage(CatalogStorage.CATALOG_URL, table);
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+    }
+
+    @Test
+    public void testParseFeedUriValid() throws URISyntaxException {
+        String table = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        CatalogStorage storage = new CatalogStorage(CatalogStorage.CATALOG_URL, table);
+        Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
+        Assert.assertEquals("clicksdb", storage.getDatabase());
+        Assert.assertEquals("clicks", storage.getTable());
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+        Assert.assertEquals(2, storage.getPartitions().size());
+        Assert.assertEquals("us", storage.getPartitionValue("region"));
+        Assert.assertTrue(storage.hasPartition("region"));
+        Assert.assertNull(storage.getPartitionValue("unknown"));
+        Assert.assertFalse(storage.hasPartition("unknown"));
+        Assert.assertEquals(storage.getDatedPartitionKey(), "ds");
+    }
+
+    @Test
+    public void testParseFeedUriValid2() throws URISyntaxException {
+        String table = "catalog:clicksdb:clicks#ds=${YEAR}${MONTH}${DAY};region=us";
+        CatalogStorage storage = new CatalogStorage(CatalogStorage.CATALOG_URL, table);
+        Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
+        Assert.assertEquals("clicksdb", storage.getDatabase());
+        Assert.assertEquals("clicks", storage.getTable());
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+        Assert.assertEquals(2, storage.getPartitions().size());
+        Assert.assertEquals("us", storage.getPartitionValue("region"));
+        Assert.assertTrue(storage.hasPartition("region"));
+        Assert.assertNull(storage.getPartitionValue("unknown"));
+        Assert.assertFalse(storage.hasPartition("unknown"));
+        Assert.assertEquals(storage.getDatedPartitionKey(), "ds");
+    }
+
+    @Test
+    public void testCreateFromUriTemplate() throws Exception {
+        String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}-${MONTH}-${DAY}";
+        CatalogStorage storage = new CatalogStorage(uriTemplate);
+        Assert.assertEquals("thrift://localhost:49083", storage.getCatalogUrl());
+        Assert.assertEquals("clicksdb", storage.getDatabase());
+        Assert.assertEquals("clicks", storage.getTable());
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+        Assert.assertEquals(2, storage.getPartitions().size());
+        Assert.assertEquals("us", storage.getPartitionValue("region"));
+        Assert.assertTrue(storage.hasPartition("region"));
+        Assert.assertNull(storage.getPartitionValue("unknown"));
+        Assert.assertFalse(storage.hasPartition("unknown"));
+    }
+
+    @DataProvider(name = "invalidFeedURITemplates")
+    public Object[][] createInValidFeedUriTemplates() {
+        return new Object[][] {
+            {"thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}/${MONTH}/${DAY}"},
+            {"thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}/${MONTH}-${DAY}"},
+        };
+    }
+
+    @Test(dataProvider = "invalidFeedURITemplates", expectedExceptions = URISyntaxException.class)
+    public void testParseInvalidFeedUriTemplate(String uriTemplate) throws URISyntaxException {
+        new CatalogStorage(uriTemplate);
+        Assert.fail("Exception must have been thrown");
+    }
+
+    @DataProvider(name = "invalidFeedURIs")
+    public Object[][] createFeedUriInvalid() {
+        return new Object[][] {
+            {"catalog:default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us"},
+            {"default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us"},
+            {"catalog:default#ds=${YEAR}-${MONTH}-${DAY};region=us"},
+            {"catalog://default/clicks#ds=${YEAR}-${MONTH}-${DAY}:region=us"},
+        };
+    }
+
+    @Test(dataProvider = "invalidFeedURIs", expectedExceptions = URISyntaxException.class)
+    public void testParseFeedUriInvalid(String tableUri) throws URISyntaxException {
+        new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
+        Assert.fail("Exception must have been thrown");
+    }
+
+    @Test
+    public void testIsIdenticalPositive() throws Exception {
+        CatalogStorage table1 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        CatalogStorage table2 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        Assert.assertTrue(table1.isIdentical(table2));
+
+        final String catalogUrl = "thrift://localhost:49083";
+        CatalogStorage table3 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        CatalogStorage table4 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        Assert.assertTrue(table3.isIdentical(table4));
+    }
+
+    @Test
+    public void testIsIdenticalNegative() throws Exception {
+        CatalogStorage table1 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        CatalogStorage table2 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:impressions#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        Assert.assertFalse(table1.isIdentical(table2));
+
+        final String catalogUrl = "thrift://localhost:49083";
+        CatalogStorage table3 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        CatalogStorage table4 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:impressions#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        Assert.assertFalse(table3.isIdentical(table4));
+
+        CatalogStorage table5 = new CatalogStorage("thrift://localhost:49084",
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        CatalogStorage table6 = new CatalogStorage("thrift://localhost:49083",
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        Assert.assertFalse(table5.isIdentical(table6));
+    }
+
+    @Test
+    public void testGetUriTemplateWithCatalogUrl() throws Exception {
+        final String catalogUrl = "thrift://localhost:49083";
+        String tableUri = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/ds=${YEAR}-${MONTH}-${DAY};region=us";
+
+        CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
+
+        Assert.assertEquals(uriTemplate, table.getUriTemplate());
+        Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+        Assert.assertEquals(table.getUriTemplate(), table.getUriTemplate(LocationType.DATA));
+    }
+
+    @Test
+    public void testGetUriTemplateWithOutCatalogUrl() throws Exception {
+        String tableUri = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        String uriTemplate = "${hcatNode}/clicksdb/clicks/ds=${YEAR}-${MONTH}-${DAY};region=us";
+
+        CatalogStorage table = new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
+
+        Assert.assertEquals(uriTemplate, table.getUriTemplate());
+        Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+        Assert.assertEquals(table.getUriTemplate(), table.getUriTemplate(LocationType.DATA));
+    }
+
+    @Test
+    public void testToPartitionFilter() throws Exception {
+        final String catalogUrl = "thrift://localhost:49083";
+        String tableUri = "catalog:clicksdb:clicks#ds=20130918;region=us";
+        String partitionFilter = "(ds='20130918';region='us')";
+
+        CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
+        Assert.assertEquals(table.toPartitionFilter(), partitionFilter);
+    }
+
+    @Test
+    public void testToPartitionAsPath() throws Exception {
+        final String catalogUrl = "thrift://localhost:49083";
+        String tableUri = "catalog:clicksdb:clicks#ds=20130918;region=us";
+        String partitionPath = "ds=20130918/region=us";
+
+        CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
+        Assert.assertEquals(table.toPartitionAsPath(), partitionPath);
+    }
+
+    @Test
+    public void testCreateFromURL() throws Exception {
+        String url = "thrift://localhost:29083/falcon_db/output_table/ds=2012-04-21-00";
+        CatalogStorage storage = new CatalogStorage(url);
+        Assert.assertEquals("thrift://localhost:29083", storage.getCatalogUrl());
+        Assert.assertEquals("falcon_db", storage.getDatabase());
+        Assert.assertEquals("output_table", storage.getTable());
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+        Assert.assertEquals(1, storage.getPartitions().size());
+        Assert.assertEquals("2012-04-21-00", storage.getPartitionValue("ds"));
+        Assert.assertTrue(storage.hasPartition("ds"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
new file mode 100644
index 0000000..6917472
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test class for File System Storage.
+ */
+public class FileSystemStorageTest {
+
+    @Test
+    public void testGetType() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
+        Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
+    }
+
+    @Test
+    public void testCreateFromUriTemplate() throws Exception {
+        String feedBasePath = "DATA=hdfs://localhost:8020"
+                + "/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
+                + "#"
+                + "META=hdfs://localhost:8020"
+                + "/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
+                + "#"
+                + "STATS=hdfs://localhost:8020"
+                + "/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}";
+
+        FileSystemStorage storage = new FileSystemStorage(feedBasePath);
+        Assert.assertEquals(storage.getUriTemplate(), feedBasePath + "#TMP=/tmp");
+
+        Assert.assertEquals("hdfs://localhost:8020", storage.getStorageUrl());
+        Assert.assertEquals("hdfs://localhost:8020/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+                storage.getUriTemplate(LocationType.DATA));
+        Assert.assertEquals("hdfs://localhost:8020/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+                storage.getUriTemplate(LocationType.STATS));
+        Assert.assertEquals("hdfs://localhost:8020/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+                storage.getUriTemplate(LocationType.META));
+    }
+
+    @Test
+    public void testGetUriTemplateForData() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+    }
+
+    @Test
+    public void testGetUriTemplate() throws Exception {
+        final Location dataLocation = new Location();
+        dataLocation.setPath("/data/foo/bar");
+        dataLocation.setType(LocationType.DATA);
+
+        final Location metaLocation = new Location();
+        metaLocation.setPath("/meta/foo/bar");
+        metaLocation.setType(LocationType.META);
+
+        final Location statsLocation = new Location();
+        statsLocation.setPath("/stats/foo/bar");
+        statsLocation.setType(LocationType.STATS);
+
+        final Location tmpLocation = new Location();
+        tmpLocation.setPath("/tmp/foo/bar");
+        tmpLocation.setType(LocationType.TMP);
+
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(dataLocation);
+        locations.add(metaLocation);
+        locations.add(statsLocation);
+        locations.add(tmpLocation);
+
+        StringBuilder expected = new StringBuilder();
+        expected.append(LocationType.DATA)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/data/foo/bar")
+                .append(FileSystemStorage.FEED_PATH_SEP)
+                .append(LocationType.META)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/meta/foo/bar")
+                .append(FileSystemStorage.FEED_PATH_SEP)
+                .append(LocationType.STATS)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/stats/foo/bar")
+                .append(FileSystemStorage.FEED_PATH_SEP)
+                .append(LocationType.TMP)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/tmp/foo/bar");
+
+        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+        Assert.assertEquals(storage.getUriTemplate(), expected.toString());
+    }
+
+    @Test
+    public void testGetUriTemplateWithOutStorageURL() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar");
+    }
+
+    @DataProvider(name = "locationTestDataProvider")
+    private Object[][] createLocationTestData() {
+        return new Object[][] {
+            {"hdfs://localhost:41020", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+            {"hdfs://localhost:41020", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+            {"hdfs://localhost:41020", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+            {"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+            {"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+            {"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+        };
+    }
+
+    @Test (dataProvider = "locationTestDataProvider")
+    public void testGetUriTemplateWithLocationType(String storageUrl, String path,
+                                                   String expected) throws Exception {
+        final Location location = new Location();
+        location.setPath(path);
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage(storageUrl, locations);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), storageUrl + expected);
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+        Assert.assertTrue(storage.exists());
+    }
+
+    @Test
+    public void testIsIdentical() throws Exception {
+        final String storageUrl = "hdfs://localhost:41020";
+        final Location location1 = new Location();
+        location1.setPath("/foo/bar");
+        location1.setType(LocationType.DATA);
+        List<Location> locations1 = new ArrayList<Location>();
+        locations1.add(location1);
+        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
+
+        final Location location2 = new Location();
+        location2.setPath("/foo/bar");
+        location2.setType(LocationType.DATA);
+        List<Location> locations2 = new ArrayList<Location>();
+        locations2.add(location2);
+        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
+
+        Assert.assertTrue(storage1.isIdentical(storage2));
+    }
+
+    @Test
+    public void testIsIdenticalNegative() throws Exception {
+        final String storageUrl = "hdfs://localhost:41020";
+        final Location location1 = new Location();
+        location1.setPath("/foo/baz");
+        location1.setType(LocationType.DATA);
+        List<Location> locations1 = new ArrayList<Location>();
+        locations1.add(location1);
+        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
+
+        final Location location2 = new Location();
+        location2.setPath("/foo/bar");
+        location2.setType(LocationType.DATA);
+        List<Location> locations2 = new ArrayList<Location>();
+        locations2.add(location2);
+        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
+
+        Assert.assertFalse(storage1.isIdentical(storage2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java b/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java
new file mode 100644
index 0000000..eb0127d
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.FeedEntityParser;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Test for storage factory methods in feed helper.
+ */
+public class StorageFactoryTest {
+
+    private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
+
+    private static final String FS_FEED_UNIFORM = "/config/feed/feed-0.1.xml";
+    private static final String FS_FEED_OVERRIDE = "/config/feed/feed-0.2.xml";
+
+    private static final String TABLE_FEED_UNIFORM = "/config/feed/hive-table-feed.xml";
+    private static final String TABLE_FEED_OVERRIDE = "/config/feed/hive-table-feed-out.xml";
+
+    private static final String OVERRIDE_TBL_LOC = "/testCluster/clicks-summary/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+
+    private final ClusterEntityParser clusterParser =
+            (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+    private final FeedEntityParser feedParser =
+            (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+
+    private Cluster clusterEntity;
+    private Feed fsFeedWithUniformStorage;
+    private Feed fsFeedWithOverriddenStorage;
+    private Feed tableFeedWithUniformStorage;
+    private Feed tableFeedWithOverriddenStorage;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
+        clusterEntity = clusterParser.parse(stream);
+        stream.close();
+        Interface registry = ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY);
+        registry.setEndpoint("thrift://localhost:9083");
+        ConfigurationStore.get().publish(EntityType.CLUSTER, clusterEntity);
+
+        stream = this.getClass().getResourceAsStream(FS_FEED_UNIFORM);
+        fsFeedWithUniformStorage = feedParser.parse(stream);
+        stream.close();
+
+        stream = this.getClass().getResourceAsStream(FS_FEED_OVERRIDE);
+        fsFeedWithOverriddenStorage = feedParser.parse(stream);
+        stream.close();
+
+        stream = this.getClass().getResourceAsStream(TABLE_FEED_UNIFORM);
+        tableFeedWithUniformStorage = feedParser.parse(stream);
+        stream.close();
+
+        stream = this.getClass().getResourceAsStream(TABLE_FEED_OVERRIDE);
+        tableFeedWithOverriddenStorage = feedParser.parse(stream);
+        stream.close();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        ConfigurationStore.get().remove(EntityType.CLUSTER, clusterEntity.getName());
+    }
+
+    @DataProvider (name = "locationsDataProvider")
+    private Object[][] createLocationsDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, "/projects/falcon/clicks"},
+            {fsFeedWithOverriddenStorage, "/testCluster/projects/falcon/clicks"},
+        };
+    }
+
+    @Test (dataProvider = "locationsDataProvider")
+    public void testGetLocations(Feed feed, String dataPath) {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
+        for (Location location : locations) {
+            if (location.getType() == LocationType.DATA) {
+                Assert.assertEquals(location.getPath(), dataPath);
+            }
+        }
+    }
+
+    @DataProvider (name = "tableDataProvider")
+    private Object[][] createTableDataProvider() {
+        return new Object[][] {
+            {tableFeedWithUniformStorage, "catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+            {tableFeedWithOverriddenStorage, "catalog:testCluster:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+        };
+    }
+
+    @Test (dataProvider = "tableDataProvider")
+    public void testGetTable(Feed feed, String dataPath) {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        CatalogTable table = FeedHelper.getTable(feedCluster, feed);
+        Assert.assertEquals(table.getUri(), dataPath);
+    }
+
+    private static final String UNIFORM_TABLE = "${hcatNode}/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+    private static final String OVERRIDETBL = "${hcatNode}/default/clicks-summary/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+
+
+    @DataProvider (name = "uniformFeedStorageDataProvider")
+    private Object[][] createUniformFeedStorageDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM, "${nameNode}/projects/falcon/clicks"},
+            {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM, "${nameNode}/projects/falcon/clicks"},
+            {tableFeedWithUniformStorage, Storage.TYPE.TABLE, UNIFORM_TABLE},
+            {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE, OVERRIDETBL},
+        };
+    }
+
+    @Test (dataProvider = "uniformFeedStorageDataProvider")
+    public void testCreateStorageWithFeed(Feed feed, Storage.TYPE storageType,
+                                            String dataLocation) throws Exception {
+        Storage storage = FeedHelper.createStorage(feed);
+        Assert.assertEquals(storage.getType(), storageType);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+
+        if (storageType == Storage.TYPE.TABLE) {
+            Assert.assertEquals(((CatalogStorage) storage).getDatabase(), "default");
+        }
+    }
+
+    @DataProvider (name = "overriddenFeedStorageDataProvider")
+    private Object[][] createFeedStorageDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM, "/projects/falcon/clicks"},
+            {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM, "/testCluster/projects/falcon/clicks"},
+            {tableFeedWithUniformStorage, Storage.TYPE.TABLE, "/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+            {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE, OVERRIDE_TBL_LOC},
+        };
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithFeedAndClusterEntity(Feed feed, Storage.TYPE storageType,
+                                                          String dataLocation) throws Exception {
+        Storage storage = FeedHelper.createStorage(clusterEntity, feed);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithFeedAndClusterName(Feed feed, Storage.TYPE storageType,
+                                                        String dataLocation) throws Exception {
+        Storage storage = FeedHelper.createStorage(clusterEntity.getName(), feed);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithFeedAndFeedCluster(Feed feed, Storage.TYPE storageType,
+                                                        String dataLocation) throws Exception {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        Storage storage = FeedHelper.createStorage(feedCluster, feed);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithAll(Feed feed, Storage.TYPE storageType,
+                                         String dataLocation) throws Exception {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        Storage storage = FeedHelper.createStorage(feedCluster, feed, clusterEntity);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateReadOnlyStorage(Feed feed, Storage.TYPE storageType,
+                                          String dataLocation) throws Exception {
+        Storage readOnlyStorage = FeedHelper.createReadOnlyStorage(clusterEntity, feed);
+        Assert.assertEquals(readOnlyStorage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getReadOnlyStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(readOnlyStorage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @DataProvider (name = "uriTemplateDataProvider")
+    private Object[][] createUriTemplateDataProvider() {
+        return new Object[][] {
+            {Storage.TYPE.FILESYSTEM, "/projects/falcon/clicks"},
+            {Storage.TYPE.FILESYSTEM, "/testCluster/projects/falcon/clicks"},
+            {Storage.TYPE.TABLE, "/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+            {Storage.TYPE.TABLE, OVERRIDE_TBL_LOC},
+        };
+    }
+
+    @Test (dataProvider = "uriTemplateDataProvider")
+    public void testCreateStorageWithUriTemplate(Storage.TYPE storageType,
+                                                 String dataLocation) throws Exception {
+        String uriTemplate = null;
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            uriTemplate = "DATA=" + ClusterHelper.getStorageUrl(clusterEntity) + dataLocation + "#";
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            uriTemplate =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+            dataLocation = uriTemplate;
+        }
+
+        Storage storage = FeedHelper.createStorage(storageType.name(), uriTemplate);
+        Assert.assertEquals(storage.getType(), storageType);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @DataProvider (name = "storageTypeDataProvider")
+    private Object[][] createStorageTypeDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM},
+            {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM},
+            {tableFeedWithUniformStorage, Storage.TYPE.TABLE},
+            {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE},
+        };
+    }
+
+    @Test (dataProvider = "storageTypeDataProvider")
+    public void testGetStorageTypeWithFeed(Feed feed, Storage.TYPE expectedStorageType) throws Exception {
+        Storage.TYPE actualStorageType = FeedHelper.getStorageType(feed);
+        Assert.assertEquals(actualStorageType, expectedStorageType);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        actualStorageType = FeedHelper.getStorageType(feed, feedCluster);
+        Assert.assertEquals(actualStorageType, expectedStorageType);
+
+        actualStorageType = FeedHelper.getStorageType(feed, clusterEntity);
+        Assert.assertEquals(actualStorageType, expectedStorageType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index 20d14e8..050efe1 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.falcon.entity.parser;
 
-import static org.testng.AssertJUnit.assertEquals;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
@@ -28,6 +26,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.ClusterHelper;
@@ -35,11 +34,13 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interface;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.util.StartupProperties;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+
 /**
  * Test for validating cluster entity parsing.
  */
@@ -56,31 +57,70 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name"));
 
         Assert.assertNotNull(cluster);
-        assertEquals(cluster.getName(), "testCluster");
+        Assert.assertEquals(cluster.getName(), "testCluster");
 
         Interface execute = ClusterHelper.getInterface(cluster, Interfacetype.EXECUTE);
 
-        assertEquals(execute.getEndpoint(), "localhost:8021");
-        assertEquals(execute.getVersion(), "0.20.2");
+        Assert.assertEquals(execute.getEndpoint(), "localhost:8021");
+        Assert.assertEquals(execute.getVersion(), "0.20.2");
 
         Interface readonly = ClusterHelper.getInterface(cluster, Interfacetype.READONLY);
-        assertEquals(readonly.getEndpoint(), "hftp://localhost:50010");
-        assertEquals(readonly.getVersion(), "0.20.2");
+        Assert.assertEquals(readonly.getEndpoint(), "hftp://localhost:50010");
+        Assert.assertEquals(readonly.getVersion(), "0.20.2");
 
         Interface write = ClusterHelper.getInterface(cluster, Interfacetype.WRITE);
         //assertEquals(write.getEndpoint(), conf.get("fs.default.name"));
-        assertEquals(write.getVersion(), "0.20.2");
+        Assert.assertEquals(write.getVersion(), "0.20.2");
 
         Interface workflow = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW);
-        assertEquals(workflow.getEndpoint(), "http://localhost:11000/oozie/");
-        assertEquals(workflow.getVersion(), "3.1");
+        Assert.assertEquals(workflow.getEndpoint(), "http://localhost:11000/oozie/");
+        Assert.assertEquals(workflow.getVersion(), "3.1");
 
-        assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging");
+        Assert.assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging");
 
         StringWriter stringWriter = new StringWriter();
         Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
         marshaller.marshal(cluster, stringWriter);
         System.out.println(stringWriter.toString());
+
+        Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
+        Assert.assertEquals(catalog.getEndpoint(), "http://localhost:48080/templeton/v1");
+        Assert.assertEquals(catalog.getVersion(), "0.11.0");
+
+        Assert.assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging");
+    }
+
+    @Test
+    public void testParseClusterWithoutRegistry() throws IOException, FalconException, JAXBException {
+
+        StartupProperties.get().setProperty(CatalogServiceFactory.CATALOG_SERVICE, "thrift://localhost:9083");
+        Assert.assertTrue(CatalogServiceFactory.isEnabled());
+
+        InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-registry.xml");
+        Cluster cluster = parser.parse(stream);
+
+        Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
+        Assert.assertNull(catalog);
+
+        StartupProperties.get().remove(CatalogServiceFactory.CATALOG_SERVICE);
+        Assert.assertFalse(CatalogServiceFactory.isEnabled());
+
+        catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
+        Assert.assertNull(catalog);
+    }
+
+    @Test
+    public void testParseClusterWithBadRegistry() throws Exception {
+        // disable catalog service
+        StartupProperties.get().remove(CatalogServiceFactory.CATALOG_SERVICE);
+        Assert.assertFalse(CatalogServiceFactory.isEnabled());
+
+        InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-bad-registry.xml");
+        Cluster cluster = parser.parse(stream);
+
+        Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
+        Assert.assertEquals(catalog.getEndpoint(), "Hcat");
+        Assert.assertEquals(catalog.getVersion(), "0.1");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 4cc4a0e..b90713e 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -116,12 +116,12 @@ public class FeedEntityParserTest extends AbstractTestBase {
         assertEquals(feed.getClusters().getClusters().get(1).getRetention()
                 .getLimit().toString(), "hours(6)");
 
-        assertEquals(FeedHelper.getLocation(feed, LocationType.DATA).getPath(),
-                "/projects/falcon/clicks");
-        assertEquals(FeedHelper.getLocation(feed, LocationType.META).getPath(),
-                "/projects/falcon/clicksMetaData");
-        assertEquals(FeedHelper.getLocation(feed, LocationType.STATS).getPath(),
-                "/projects/falcon/clicksStats");
+        assertEquals("${nameNode}/projects/falcon/clicks",
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
+        assertEquals("${nameNode}/projects/falcon/clicksMetaData",
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.META));
+        assertEquals("${nameNode}/projects/falcon/clicksStats",
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.STATS));
 
         assertEquals(feed.getACL().getGroup(), "group");
         assertEquals(feed.getACL().getOwner(), "testuser");
@@ -275,7 +275,8 @@ public class FeedEntityParserTest extends AbstractTestBase {
         parser.parseAndValidate(feed2.toString());
     }
 
-    @Test(expectedExceptions = ValidationException.class)
+    // TODO Disabled the test since I do not see anything invalid in here.
+    @Test(enabled = false, expectedExceptions = ValidationException.class)
     public void testInvalidFeedClusterDataLocation() throws JAXBException, FalconException {
         Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
                 (FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
@@ -444,4 +445,39 @@ public class FeedEntityParserTest extends AbstractTestBase {
             Assert.assertEquals(org.xml.sax.SAXParseException.class, e.getCause().getCause().getClass());
         }
     }
+
+    @Test
+    public void testParseFeedWithTable() throws FalconException {
+        final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
+        Feed feedWithTable = parser.parse(inputStream);
+        Assert.assertEquals(feedWithTable.getTable().getUri(),
+                "catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
+    }
+
+    @Test (expectedExceptions = FalconException.class)
+    public void testParseInvalidFeedWithTable() throws FalconException {
+        parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
+    }
+
+    @Test (expectedExceptions = FalconException.class)
+    public void testValidateFeedWithTableAndMultipleSources() throws FalconException {
+        parser.parseAndValidate(FeedEntityParserTest.class.getResourceAsStream(
+                "/config/feed/table-with-multiple-sources-feed.xml"));
+        Assert.fail("Should have thrown an exception:Multiple sources are not supported for feed with table storage");
+    }
+
+    @Test(expectedExceptions = ValidationException.class)
+    public void testValidatePartitionsForTable() throws Exception {
+        Feed feed = parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/hive-table-feed.xml"));
+        Assert.assertNull(feed.getPartitions());
+
+        Partitions partitions = new Partitions();
+        Partition partition = new Partition();
+        partition.setName("colo");
+        partitions.getPartitions().add(partition);
+        feed.setPartitions(partitions);
+
+        parser.validate(feed);
+        Assert.fail("An exception should have been thrown:Partitions are not supported for feeds with table storage");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 39acc53..e656772 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -33,7 +33,9 @@ import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -286,4 +288,45 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         process.getClusters().getClusters().add(1, process.getClusters().getClusters().get(0));
         parser.validate(process);
     }
+
+    @Test
+    public void testProcessForTableStorage() throws Exception {
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
+                this.getClass().getResource("/config/feed/hive-table-feed.xml"));
+        getStore().publish(EntityType.FEED, inFeed);
+
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
+                this.getClass().getResource("/config/feed/hive-table-feed-out.xml"));
+        getStore().publish(EntityType.FEED, outFeed);
+
+        Process process = parser.parse(
+                ProcessEntityParserTest.class.getResourceAsStream("/config/process/process-table.xml"));
+        Input input = process.getInputs().getInputs().get(0);
+        Assert.assertFalse(input.isOptional());
+        parser.validate(process);
+
+        // Test Optional Inputs For Table Storage
+        try {
+            input.setOptional(Boolean.TRUE);
+            Assert.assertTrue(input.isOptional());
+            parser.validate(process);
+            Assert.fail("Validation exception must have been thrown.");
+        } catch (FalconException e) {
+            Assert.assertTrue(e instanceof ValidationException);
+        }
+    }
+
+    @Test(expectedExceptions = ValidationException.class)
+    public void testValidateInputPartitionForTable() throws Exception {
+        Process process = parser.parse(
+                ProcessEntityParserTest.class.getResourceAsStream("/config/process/process-table.xml"));
+        if (process.getInputs() != null) {
+            for (Input input : process.getInputs().getInputs()) {
+                input.setPartition("region=usa");
+            }
+        }
+
+        parser.validate(process);
+        Assert.fail("An exception should have been thrown since Input partitions are not supported for table storage");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index d4b5a2a..42bcee0 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -27,8 +27,11 @@ import org.apache.falcon.entity.parser.ProcessEntityParser;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Properties;
 import org.apache.falcon.entity.v0.feed.Property;
@@ -40,6 +43,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.InputStream;
+
 /**
  * Test for Update helper methods.
  */
@@ -117,11 +122,11 @@ public class UpdateHelperTest extends AbstractTestBase {
         newFeed.getLateArrival().setCutOff(oldFeed.getLateArrival().getCutOff());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
-        FeedHelper.getLocation(newFeed, LocationType.DATA).setPath("/test");
+        getLocation(newFeed, LocationType.DATA).setPath("/test");
         Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
-        FeedHelper.getLocation(newFeed, LocationType.DATA).setPath(
-                FeedHelper.getLocation(oldFeed, LocationType.DATA).getPath());
+        getLocation(newFeed, LocationType.DATA).setPath(
+                getLocation(oldFeed, LocationType.DATA).getPath());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
         newFeed.setFrequency(Frequency.fromString("months(1)"));
@@ -154,4 +159,50 @@ public class UpdateHelperTest extends AbstractTestBase {
                         process.getClusters().getClusters().get(0).getName()).getValidity().getStart());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
     }
+
+    @Test
+    public void testShouldUpdateTable() throws Exception {
+        InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
+        Feed oldTableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
+        getStore().publish(EntityType.FEED, oldTableFeed);
+
+        String cluster = "testCluster";
+        Feed newTableFeed = (Feed) oldTableFeed.copy();
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+
+        newTableFeed.setGroups("newgroups");
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+        newTableFeed.setFrequency(Frequency.fromString("days(1)"));
+        Assert.assertTrue(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+
+        final CatalogTable table = new CatalogTable();
+        table.setUri("catalog:default:clicks-blah#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
+        newTableFeed.setTable(table);
+        Assert.assertTrue(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+
+        inputStream = getClass().getResourceAsStream("/config/process/process-table.xml");
+        Process oldProcess = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(inputStream);
+        Process newProcess = (Process) oldProcess.copy();
+
+        newProcess.getRetry().setPolicy(PolicyType.FINAL);
+        Assert.assertFalse(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+        newProcess.setFrequency(Frequency.fromString("days(1)"));
+        Assert.assertTrue(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+    }
+
+    private static Location getLocation(Feed feed, LocationType type) {
+        return getLocation(feed.getLocations(), type);
+    }
+
+    private static Location getLocation(Locations locations, LocationType type) {
+        for (Location loc : locations.getLocations()) {
+            if (loc.getType() == type) {
+                return loc;
+            }
+        }
+        Location loc = new Location();
+        loc.setPath("/tmp");
+        loc.setType(type);
+        return loc;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/cluster/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/cluster/cluster-0.1.xml b/common/src/test/resources/config/cluster/cluster-0.1.xml
index fd6e06e..658711d 100644
--- a/common/src/test/resources/config/cluster/cluster-0.1.xml
+++ b/common/src/test/resources/config/cluster/cluster-0.1.xml
@@ -29,7 +29,8 @@
                    version="3.1"/>
         <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
                    version="5.1.6"/>
-        <interface type="registry" endpoint="Hcat" version="1"/>
+        <interface type="registry" endpoint="http://localhost:48080/templeton/v1"
+                   version="0.11.0"/>
     </interfaces>
     <locations>
         <location name="staging" path="/projects/falcon/staging"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/cluster/cluster-bad-registry.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/cluster/cluster-bad-registry.xml b/common/src/test/resources/config/cluster/cluster-bad-registry.xml
new file mode 100644
index 0000000..1417d15
--- /dev/null
+++ b/common/src/test/resources/config/cluster/cluster-bad-registry.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="default" description="" name="testCluster" xmlns="uri:falcon:cluster:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="registry" endpoint="Hcat" version="0.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/cluster/cluster-no-registry.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/cluster/cluster-no-registry.xml b/common/src/test/resources/config/cluster/cluster-no-registry.xml
new file mode 100644
index 0000000..85dfe32
--- /dev/null
+++ b/common/src/test/resources/config/cluster/cluster-no-registry.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="default" description="" name="testCluster" xmlns="uri:falcon:cluster:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/feed/feed-0.2.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-0.2.xml b/common/src/test/resources/config/feed/feed-0.2.xml
index a3f4f4d..8bf7d20 100644
--- a/common/src/test/resources/config/feed/feed-0.2.xml
+++ b/common/src/test/resources/config/feed/feed-0.2.xml
@@ -34,11 +34,21 @@
             <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
             <retention limit="hours(48)" action="delete"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <locations>
+                <location type="data" path="/testCluster/projects/falcon/clicks"/>
+                <location type="stats" path="/testCluster/projects/falcon/clicksStats"/>
+                <location type="meta" path="/testCluster/projects/falcon/clicksMetaData"/>
+            </locations>
         </cluster>
         <cluster name="backupCluster" type="target">
             <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
             <retention limit="hours(6)" action="archive"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <locations>
+                <location type="data" path="/backupCluster/projects/falcon/clicks"/>
+                <location type="stats" path="/backupCluster/projects/falcon/clicksStats"/>
+                <location type="meta" path="/backupCluster/projects/falcon/clicksMetaData"/>
+            </locations>
         </cluster>
     </clusters>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/feed/hive-table-feed-out.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed-out.xml b/common/src/test/resources/config/feed/hive-table-feed-out.xml
new file mode 100644
index 0000000..17eb954
--- /dev/null
+++ b/common/src/test/resources/config/feed/hive-table-feed-out.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks summary table " name="clicks-summary-table" xmlns="uri:falcon:feed:0.1">
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <table uri="catalog:testCluster:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <table uri="catalog:backupCluster:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
new file mode 100644
index 0000000..67b3f4e
--- /dev/null
+++ b/common/src/test/resources/config/feed/hive-table-feed.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log table" name="clicks-table" xmlns="uri:falcon:feed:0.1">
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <clusters>
+        <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/feed/invalid-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/invalid-feed.xml b/common/src/test/resources/config/feed/invalid-feed.xml
new file mode 100644
index 0000000..b7273a9
--- /dev/null
+++ b/common/src/test/resources/config/feed/invalid-feed.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml b/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
new file mode 100644
index 0000000..f84f3d4
--- /dev/null
+++ b/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="testCluster" type="source">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/resources/config/process/process-table.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-table.xml b/common/src/test/resources/config/process/process-table.xml
new file mode 100644
index 0000000..1d6a8f0
--- /dev/null
+++ b/common/src/test/resources/config/process/process-table.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="table-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="testCluster">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-table" start="today(0,0)" end="today(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="oozie" path="/path/to/workflow"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>


Mime
View raw message