falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sowmya...@apache.org
Subject [2/3] falcon git commit: FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran
Date Thu, 29 Oct 2015 01:08:39 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 4020d36..60bf1be 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -28,14 +28,25 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Properties;
 import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.feed.Argument;
+import org.apache.falcon.entity.v0.feed.Arguments;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
 import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Extract;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.FieldsType;
+import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
+import org.apache.falcon.entity.v0.feed.Import;
 import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Source;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Validity;
+
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
@@ -723,7 +734,6 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(result, expected);
     }
 
-    @Test
     public void testIsLifeCycleEnabled() throws Exception {
         Feed feed = new Feed();
 
@@ -849,6 +859,44 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(4)"));
     }
 
+    @Test
+    public void testFeedImportSnapshot() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
+        Assert.assertNotNull(feed.getClusters().getClusters());
+        Assert.assertNotNull(feed.getClusters().getClusters().get(0));
+        Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity());
+        Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity().getStart());
+        Assert.assertNotNull(startInstResult);
+        Assert.assertNotNull(feedCluster.getValidity().getStart());
+        Assert.assertEquals(getDate("2012-02-07 00:00 UTC"), feedCluster.getValidity().getStart());
+        Assert.assertTrue(FeedHelper.isImportEnabled(feedCluster));
+        Assert.assertEquals(MergeType.SNAPSHOT, FeedHelper.getImportMergeType(feedCluster));
+        Assert.assertNotEquals(startInstResult, feedCluster.getValidity().getStart());
+    }
+
+    @Test
+    public void testFeedImportFields() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
+        List<String> fieldList = FeedHelper.getFieldList(feedCluster);
+        Assert.assertEquals(2, fieldList.size());
+        Assert.assertFalse(FeedHelper.isFieldExcludes(feedCluster));
+    }
+
+    @Test
+    public void testFeedImportAppend() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = importFeedAppend(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
+        Assert.assertEquals(startInstResult, feed.getClusters().getClusters().get(0).getValidity().getStart());
+    }
+
     private Validity getFeedValidity(String start, String end) throws ParseException {
         Validity validity = new Validity();
         validity.setStart(getDate(start));
@@ -881,6 +929,11 @@ public class FeedHelperTest extends AbstractTestBase {
 
     private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
         throws FalconException, ParseException {
+        return publishFeed(cluster, frequency, start, end, null);
+    }
+
+    private Feed publishFeed(Cluster cluster, String frequency, String start, String end, Import imp)
+        throws FalconException, ParseException {
 
         Feed feed = new Feed();
         feed.setName("feed");
@@ -889,6 +942,8 @@ public class FeedHelperTest extends AbstractTestBase {
         feed.setTimezone(UTC);
         Clusters fClusters = new Clusters();
         org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        fCluster.setType(ClusterType.SOURCE);
+        fCluster.setImport(imp);
         fCluster.setName(cluster.getName());
         fCluster.setValidity(getFeedValidity(start, end));
         fClusters.getClusters().add(fCluster);
@@ -913,4 +968,54 @@ public class FeedHelperTest extends AbstractTestBase {
         process.setFrequency(f);
         return process;
     }
+
+    private Feed importFeedSnapshot(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+
+        Import imp = getAnImport(MergeType.SNAPSHOT);
+        Feed feed = publishFeed(cluster, frequency, start, end, imp);
+        return feed;
+    }
+
+    private Feed importFeedAppend(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+
+        Import imp = getAnImport(MergeType.APPEND);
+        Feed feed = publishFeed(cluster, frequency, start, end);
+        return feed;
+    }
+
+    private Import getAnImport(MergeType mergeType) {
+        Extract extract = new Extract();
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(mergeType);
+
+        FieldIncludeExclude fieldInclude = new FieldIncludeExclude();
+        fieldInclude.getFields().add("id");
+        fieldInclude.getFields().add("name");
+        FieldsType fields = new FieldsType();
+        fields.setIncludes(fieldInclude);
+
+        Source source = new Source();
+        source.setName("test-db");
+        source.setTableName("test-table");
+        source.setExtract(extract);
+        source.setFields(fields);
+
+        Argument a1 = new Argument();
+        a1.setName("--split_by");
+        a1.setValue("id");
+        Argument a2 = new Argument();
+        a2.setName("--num-mappers");
+        a2.setValue("2");
+        Arguments args = new Arguments();
+        List<Argument> argList = args.getArguments();
+        argList.add(a1);
+        argList.add(a2);
+
+        Import imp = new Import();
+        imp.setSource(source);
+        imp.setArguments(args);
+        return imp;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
new file mode 100644
index 0000000..9567eab
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity.parser;
+
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+
+/**
+ * Test class for Datasource Entity.
+ */
+public class DatasourceEntityParserTest extends AbstractTestBase {
+
+    private final DatasourceEntityParser datasourceEntityParser =
+            (DatasourceEntityParser) EntityParserFactory.getParser(EntityType.DATASOURCE);
+    private final FeedEntityParser feedEntityParser =
+            (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        cleanupStore();
+    }
+
+    @Test
+    public void testDatasourceEntity() throws Exception {
+
+        InputStream stream = this.getClass().getResourceAsStream("/config/datasource/datasource-0.1.xml");
+        Datasource datasource = datasourceEntityParser.parse(stream);
+
+        ConfigurationStore store = ConfigurationStore.get();
+        store.publish(EntityType.DATASOURCE, datasource);
+
+        Datasource databaseEntity = EntityUtil.getEntity(EntityType.DATASOURCE, datasource.getName());
+        Assert.assertEquals("test-hsql-db", datasource.getName());
+        Assert.assertEquals("test-hsql-db", databaseEntity.getName());
+        Assert.assertEquals("hsql", databaseEntity.getType().value());
+        Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz());
+    }
+
+    @Test
+    public void testDatasourcePasswordFileEntity() throws Exception {
+
+        InputStream stream = this.getClass().getResourceAsStream("/config/datasource/datasource-file-0.1.xml");
+        Datasource datasource = datasourceEntityParser.parse(stream);
+        ConfigurationStore store = ConfigurationStore.get();
+        store.publish(EntityType.DATASOURCE, datasource);
+
+        Datasource databaseEntity = EntityUtil.getEntity(EntityType.DATASOURCE, datasource.getName());
+        Assert.assertEquals("test-hsql-db", datasource.getName());
+        Assert.assertEquals("test-hsql-db", databaseEntity.getName());
+        Assert.assertEquals("hsql", databaseEntity.getType().value());
+        Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 905be68..7f4abce 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -30,12 +30,16 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.Argument;
 import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
 import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Partitions;
 import org.apache.falcon.entity.v0.feed.Property;
@@ -57,6 +61,7 @@ import javax.xml.bind.Unmarshaller;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.util.Map;
 
 import static org.testng.AssertJUnit.assertEquals;
 
@@ -92,6 +97,11 @@ public class FeedEntityParserTest extends AbstractTestBase {
         LifecyclePolicyMap.get().init();
         CurrentUser.authenticate(FalconTestUtil.TEST_USER_2);
         modifiableFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
+        Unmarshaller dsUnmarshaller = EntityType.DATASOURCE.getUnmarshaller();
+        Datasource ds = (Datasource) dsUnmarshaller.unmarshal(this.getClass()
+                .getResourceAsStream(DATASOURCE_XML));
+        ds.setName("test-hsql-db");
+        store.publish(EntityType.DATASOURCE, ds);
     }
 
     @Test(expectedExceptions = ValidationException.class)
@@ -949,7 +959,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
     public void testValidateACLForArchiveReplication() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
-            StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
 
         CurrentUser.authenticate(USER);
         try {
@@ -986,6 +996,148 @@ public class FeedEntityParserTest extends AbstractTestBase {
     }
 
     @Test
+    public void testImportFeedSqoop() throws Exception {
+
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed feed = parser.parseAndValidate(feedStream);
+        final org.apache.falcon.entity.v0.feed.Cluster srcCluster = feed.getClusters().getClusters().get(0);
+        Assert.assertEquals("test-hsql-db", FeedHelper.getImportDatasourceName(srcCluster));
+        Assert.assertEquals("customer", FeedHelper.getImportDataSourceTableName(srcCluster));
+        Assert.assertEquals(2, srcCluster.getImport().getSource().getFields().getIncludes().getFields().size());
+    }
+
+    @Test
+    public void testImportFeedSqoopMinimal() throws Exception {
+
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-noargs-0.1.xml");
+        Feed feed = parser.parseAndValidate(feedStream);
+        final org.apache.falcon.entity.v0.feed.Cluster srcCluster = feed.getClusters().getClusters().get(0);
+        Assert.assertEquals("test-hsql-db", FeedHelper.getImportDatasourceName(srcCluster));
+        Assert.assertEquals("customer", FeedHelper.getImportDataSourceTableName(srcCluster));
+        Map<String, String> args = FeedHelper.getImportArguments(srcCluster);
+        Assert.assertEquals(0, args.size());
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedSqoopExcludeFields() throws Exception {
+
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-exclude-fields-0.1.xml");
+        Feed feed = parser.parseAndValidate(feedStream);
+        Assert.fail("An exception should have been thrown: Feed Import policy not yet implement Field exclusion.");
+    }
+
+    @Test
+    public void testImportFeedSqoopArgs() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Arguments args =
+                importFeed.getClusters().getClusters().get(0).getImport().getArguments();
+
+        Argument splitByArg = new Argument();
+        splitByArg.setName("--split-by");
+        splitByArg.setValue("id");
+
+        Argument numMappersArg = new Argument();
+        numMappersArg.setName("--num-mappers");
+        numMappersArg.setValue("3");
+
+        args.getArguments().clear();
+        args.getArguments().add(numMappersArg);
+        args.getArguments().add(splitByArg);
+
+        parser.validate(importFeed);
+    }
+
+    @Test
+    public void testImportFeedSqoopArgsSplitBy() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Arguments args =
+                importFeed.getClusters().getClusters().get(0).getImport().getArguments();
+        Argument splitByArg = new Argument();
+        splitByArg.setName("--split-by");
+        splitByArg.setValue("id");
+
+        args.getArguments().clear();
+        args.getArguments().add(splitByArg);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedSqoopArgsNumMapper() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Arguments args =
+                importFeed.getClusters().getClusters().get(0).getImport().getArguments();
+        Argument numMappersArg = new Argument();
+        numMappersArg.setName("--num-mappers");
+        numMappersArg.setValue("2");
+
+        args.getArguments().clear();
+        args.getArguments().add(numMappersArg);
+
+        parser.validate(importFeed);
+        Assert.fail("An exception should have been thrown: Feed Import should specify "
+                + "--split-by column along with --num-mappers");
+    }
+
+    @Test
+    public void testImportFeedExtractionType1() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Extract extract =
+                importFeed.getClusters().getClusters().get(0).getImport().getSource().getExtract();
+
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(MergeType.SNAPSHOT);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedExtractionType2() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Extract extract =
+                importFeed.getClusters().getClusters().get(0).getImport().getSource().getExtract();
+
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(MergeType.APPEND);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedExtractionType3() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Extract extract =
+                importFeed.getClusters().getClusters().get(0).getImport().getSource().getExtract();
+
+        extract.setType(ExtractMethod.INCREMENTAL);
+        extract.setMergepolicy(MergeType.APPEND);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = {ValidationException.class, FalconException.class})
+    public void testImportFeedSqoopInvalid() throws Exception {
+
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-invalid-0.1.xml");
+        parser.parseAndValidate(feedStream);
+        Assert.fail("ValidationException should have been thrown");
+    }
+
     public void testValidateEmailNotification() throws Exception {
         Feed feedNotification = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
                 (FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
@@ -1040,5 +1192,4 @@ public class FeedEntityParserTest extends AbstractTestBase {
         feed.getClusters().getClusters().get(0).getValidity().setEnd(null);
         parser.validate(feed);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
index 3863b11..f49362f 100644
--- a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
@@ -20,9 +20,20 @@ package org.apache.falcon.entity.v0;
 
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Argument;
+import org.apache.falcon.entity.v0.feed.Arguments;
 import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Extract;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.FieldsType;
+import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
+import org.apache.falcon.entity.v0.feed.Import;
+import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Source;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
@@ -31,6 +42,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -122,6 +134,57 @@ public class EntityGraphTest extends AbstractTestBase {
         return f1;
     }
 
+    private Feed addFeedImport(String feed, Cluster cluster, Datasource ds) {
+
+        Feed f1 = new Feed();
+        f1.setName(feed);
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                new org.apache.falcon.entity.v0.feed.Cluster();
+        feedCluster.setName(cluster.getName());
+        feedCluster.setType(ClusterType.SOURCE);
+        Clusters clusters = new Clusters();
+        clusters.getClusters().add(feedCluster);
+        f1.setClusters(clusters);
+
+        Import imp = getAnImport(MergeType.SNAPSHOT, ds);
+        f1.getClusters().getClusters().get(0).setImport(imp);
+        return f1;
+    }
+
+    private Import getAnImport(MergeType mergeType, Datasource ds) {
+        Extract extract = new Extract();
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(mergeType);
+
+        FieldsType fields = new FieldsType();
+        FieldIncludeExclude fieldInclude = new FieldIncludeExclude();
+        fieldInclude.getFields().add("id");
+        fieldInclude.getFields().add("name");
+        fields.setIncludes(fieldInclude);
+
+        Source source = new Source();
+        source.setName(ds.getName());
+        source.setTableName("test-table");
+        source.setExtract(extract);
+        source.setFields(fields);
+
+        Argument a1 = new Argument();
+        a1.setName("--split_by");
+        a1.setValue("id");
+        Argument a2 = new Argument();
+        a2.setName("--num-mappers");
+        a2.setValue("2");
+        Arguments args = new Arguments();
+        List<Argument> argList = args.getArguments();
+        argList.add(a1);
+        argList.add(a2);
+
+        Import imp = new Import();
+        imp.setSource(source);
+        imp.setArguments(args);
+        return imp;
+    }
+
     private void attachInput(Process process, Feed feed) {
         if (process.getInputs() == null) {
             process.setInputs(new Inputs());
@@ -283,4 +346,63 @@ public class EntityGraphTest extends AbstractTestBase {
     @Test
     public void testOnChange() throws Exception {
     }
+
+    @Test
+    public void testOnAddImport() throws Exception {
+
+        Datasource ds = new Datasource();
+        ds.setName("test-db");
+        ds.setColo("c1");
+
+        Cluster cluster = new Cluster();
+        cluster.setName("ci1");
+        cluster.setColo("c1");
+
+        Feed f1 = addFeedImport("fi1", cluster, ds);
+
+        store.publish(EntityType.CLUSTER, cluster);
+        store.publish(EntityType.DATASOURCE, ds);
+        store.publish(EntityType.FEED, f1);
+
+        Set<Entity> entities = graph.getDependents(cluster);
+        Assert.assertEquals(entities.size(), 1);
+        Assert.assertTrue(entities.contains(f1));
+
+        entities = graph.getDependents(ds);
+        Assert.assertEquals(entities.size(), 1);
+        Assert.assertTrue(entities.contains(f1));
+
+        entities = graph.getDependents(f1);
+        Assert.assertEquals(entities.size(), 2);
+        Assert.assertTrue(entities.contains(cluster));
+        Assert.assertTrue(entities.contains(ds));
+
+        store.remove(EntityType.FEED, "fi1");
+        store.remove(EntityType.DATASOURCE, "test-db");
+        store.remove(EntityType.CLUSTER, "ci1");
+    }
+
+    @Test
+    public void testOnRemoveDatasource() throws Exception {
+
+        Datasource ds = new Datasource();
+        ds.setName("test-db");
+        ds.setColo("c1");
+
+        Cluster cluster = new Cluster();
+        cluster.setName("ci1");
+        cluster.setColo("c1");
+
+        Feed f1 = addFeedImport("fi1", cluster, ds);
+
+        store.publish(EntityType.CLUSTER, cluster);
+        store.publish(EntityType.DATASOURCE, ds);
+        store.publish(EntityType.FEED, f1);
+
+        store.remove(EntityType.DATASOURCE, "test-db");
+
+        Set<Entity> entities = graph.getDependents(f1);
+        Assert.assertEquals(1, entities.size());
+        Assert.assertTrue(entities.contains(cluster));
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/datasource/datasource-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/datasource/datasource-0.1.xml b/common/src/test/resources/config/datasource/datasource-0.1.xml
new file mode 100644
index 0000000..5b09f10
--- /dev/null
+++ b/common/src/test/resources/config/datasource/datasource-0.1.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<datasource colo="west-coast" description="HSQL database on west coast" type="hsql" name="test-hsql-db" xmlns="uri:falcon:datasource:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText></passwordText>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <credential type="password-text">
+            <userName>SA</userName>
+            <passwordText>sqoop</passwordText>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/datasource/datasource-file-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/datasource/datasource-file-0.1.xml b/common/src/test/resources/config/datasource/datasource-file-0.1.xml
new file mode 100644
index 0000000..3ee40ed
--- /dev/null
+++ b/common/src/test/resources/config/datasource/datasource-file-0.1.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<datasource colo="west-coast" description="HSQL database on west coast" type="hsql" name="test-hsql-db" xmlns="uri:falcon:datasource:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-file">
+                <userName>SA</userName>
+                <passwordFile>"jail://global:00/falcon/passwordfile"/></passwordFile>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-file">
+                <userName>SA</userName>
+                <passwordFile>"jail://global:00/falcon/passwordfile"/></passwordFile>
+            </credential>
+        </interface>
+
+        <credential type="password-file">
+            <userName>SA</userName>
+            <passwordFile>"jail://global:00/falcon/passwordfile"/></passwordFile>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml b/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml
new file mode 100644
index 0000000..04fe737
--- /dev/null
+++ b/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<datasource colo="west-coast" description="A new database on west coast" type="xyz" name="test-hsql-db" xmlns="uri:falcon:datasource:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText></passwordText>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <credential type="password-text">
+            <userName>SA</userName>
+            <passwordText>sqoop</passwordText>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-0.1.xml b/common/src/test/resources/config/feed/feed-import-0.1.xml
new file mode 100644
index 0000000..798d6b0
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-0.1.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <includes>
+                            <field>id</field>
+                            <field>name</field>
+                        </includes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--split-by" value="id"/>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml b/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml
new file mode 100644
index 0000000..5a6fcd9
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <excludes>
+                            <field>id</field>
+                            <field>name</field>
+                        </excludes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--split-by" value="id"/>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml b/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml
new file mode 100644
index 0000000..9428bce
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <includes>
+                            <field>id</field>
+                            <field>name</field>
+                        </includes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml b/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml
new file mode 100644
index 0000000..c96249c
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                </source>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index e07fe12..b357c44 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -98,6 +98,61 @@ A key-value pair, which are propagated to the workflow engine.
 Ideally JMS impl class name of messaging engine (brokerImplClass) 
 should be defined here.
 
+---++ Datasource Specification
+
+The datasource entity contains connection information required to connect to a data source like MySQL database.
+The datasource XSD specification is available here:
+A datasource contains read and write interfaces which are used by Falcon to import or export data from or to
+datasources respectively. A datasource is referenced by feeds which are on-boarded to Falcon by its name.
+
+Following are the tags defined in a datasource.xml:
+
+<verbatim>
+<datasource colo="west-coast" description="Customer database on west coast" type="mysql"
+ name="test-hsql-db" xmlns="uri:falcon:datasource:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+</verbatim>
+
+The colo specifies the colo to which the datasource belongs to and name is the name of the datasource which has to
+be unique.
+
+---+++ Interfaces
+
+A datasource has two interfaces as described below:
+<verbatim>
+    <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db"/>
+</verbatim>
+
+A readonly interface specifies the endpoint and protocol to connect to a datasource.
+This would be used in the context of import from datasource into HDFS.
+
+<verbatim>
+<interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+</verbatim>
+
+A write interface specifies the endpoint and protocol to to write to the datasource.
+Falcon uses this interface to export data from hdfs to datasource.
+
+<verbatim>
+<credential type="password-text">
+    <userName>SA</userName>
+    <passwordText></passwordText>
+</credential>
+</verbatim>
+
+
+A credential is associated with an interface (read or write) providing user name and password to authenticate
+to the datasource.
+
+<verbatim>
+<credential type="password-text">
+     <userName>SA</userName>
+     <passwordFile>hdfs-file-path</passwordText>
+</credential>
+</verbatim>
+
+The credential can be specified via a password file present in the HDFS. This file should only be accessible by
+the user.
+
 ---++ Feed Specification
 The Feed XSD specification is available here.
 A Feed defines various attributes of feed like feed location, frequency, late-arrival handling and retention policies.
@@ -244,6 +299,35 @@ expressions like frequency. slaLow is intended to serve for alerting for feed in
 availability SLAs. slaHigh is intended to serve for reporting the feeds which missed their SLAs. SLAs are relative to
 feed instance time.
 
+---+++ Import
+
+<verbatim>
+<import>
+    <source name="test-hsql-db" tableName="customer">
+        <extract type="full">
+            <mergepolicy>snapshot</mergepolicy>
+         </extract>
+         <fields>
+            <includes>
+                <field>id</field>
+                <field>name</field>
+            </includes>
+         </fields>
+    </source>
+    <arguments>
+        <argument name="--split-by" value="id"/>
+        <argument name="--num-mappers" value="2"/>
+    </arguments>
+</import>
+
+A feed can have an import policy associated with it. The souce name specified the datasource reference to the
+datasource entity from which the data will be imported to HDFS. The tableName spcified the table or topic to be
+imported from the datasource. The extract type specifies the pull mechanism (full or
+incremental extract). Full extract method extracts all the data from the datasource. The incremental extraction
+method feature implementation is in progress. The mergeplocy determines how the data is to be layed out on HDFS.
+The snapshot layout creates a snapshot of the data on HDFS using the feed's location specification. Fields is used
+to specify the projection columns. Feed import from database underneath uses sqoop to achieve the task. Any advanced
+Sqoop options can be specified via the arguments.
 
 ---+++ Late Arrival
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index d994866..e001a7f 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -29,6 +29,9 @@ $FALCON_HOME/bin/falcon entity -submit -type cluster -file /cluster/definition.x
 
 Submit option is used to set up entity definition.
 
+Usage:
+$FALCON_HOME/bin/falcon entity -submit -type [cluster|datasource|feed|process] -file <entity-definition.xml>
+
 Example: 
 $FALCON_HOME/bin/falcon entity -submit -type cluster -file /cluster/definition.xml
 
@@ -65,7 +68,7 @@ Usage:
 Delete removes the submitted entity definition for the specified entity and put it into the archive.
 
 Usage:
-$FALCON_HOME/bin/falcon entity  -type [cluster|feed|process] -name <<name>> -delete
+$FALCON_HOME/bin/falcon entity  -type [cluster|datasource|feed|process] -name <<name>> -delete
 
 ---+++List
 
@@ -75,7 +78,7 @@ Usage:
 $FALCON_HOME/bin/falcon entity -list
 
 Optional Args : -fields <<field1,field2>>
--type <<[cluster|feed|process],[cluster|feed|process]>>
+-type <<[cluster|datasource|feed|process],[cluster|datasource|feed|process]>>
 -nameseq <<namesubsequence>> -tagkeys <<tagkeyword1,tagkeyword2>>
 -filterBy <<field1:value1,field2:value2>> -tags <<tagkey=tagvalue,tagkey=tagvalue>>
 -orderBy <<field>> -sortOrder <<sortOrder>> -offset 0 -numResults 10
@@ -98,8 +101,8 @@ Optional Args : -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" -fie
 
 ---+++Update
 
-Update operation allows an already submitted/scheduled entity to be updated. Cluster update is currently
-not allowed.
+Update operation allows an already submitted/scheduled entity to be updated. Cluster and datasource updates are
+currently not allowed.
 
 Usage:
 $FALCON_HOME/bin/falcon entity  -type [feed|process] -name <<name>> -update -file <<path_to_file>>
@@ -123,21 +126,23 @@ Optional Arg : -skipDryRun. When this argument is specified, Falcon skips oozie
 Status returns the current status of the entity.
 
 Usage:
-$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -status
+$FALCON_HOME/bin/falcon entity -type [cluster|datasource|feed|process] -name <<name>> -status
 
 ---+++Dependency
 
-With the use of dependency option, we can list all the entities on which the specified entity is dependent. For example for a feed, dependency return the cluster name and for process it returns all the input feeds, output feeds and cluster names.
+With the use of dependency option, we can list all the entities on which the specified entity is dependent.
+For example for a feed, dependency return the cluster name and for process it returns all the input feeds,
+output feeds and cluster names.
 
 Usage:
-$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -dependency
+$FALCON_HOME/bin/falcon entity -type [cluster|datasource|feed|process] -name <<name>> -dependency
 
 ---+++Definition
 
 Definition option returns the entity definition submitted earlier during submit step.
 
 Usage:
-$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -definition
+$FALCON_HOME/bin/falcon entity -type [cluster|datasource|feed|process] -name <<name>> -definition
 
 
 ---+++Lookup
@@ -460,7 +465,7 @@ $FALCON_HOME/bin/falcon metadata -edge -id Q9n-Q-5g
 
 Lists of all dimensions of given type. If the user provides optional param cluster, only the dimensions related to the cluster are listed.
 Usage:
-$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines]
+$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|datasource_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines]
 
 Optional Args : -cluster <<cluster name>>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 515562a..6bc5edc 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -251,7 +251,8 @@ public class JMSMessageProducer {
     private String[] getFeedPaths() throws Exception {
         WorkflowExecutionContext.EntityOperations operation = context.getOperation();
         if (operation == WorkflowExecutionContext.EntityOperations.GENERATE
-                || operation == WorkflowExecutionContext.EntityOperations.REPLICATE) {
+                || operation == WorkflowExecutionContext.EntityOperations.REPLICATE
+                || operation == WorkflowExecutionContext.EntityOperations.IMPORT) {
             LOG.debug("Returning instance paths: " + context.getOutputFeedInstancePaths());
             return context.getOutputFeedInstancePathsList();
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
new file mode 100644
index 0000000..45f46d7
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.DatasourceHelper;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Credentialtype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Builds Datasource import workflow for Oozie.
+ */
+
+public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
+    protected static final String IMPORT_SQOOP_ACTION_TEMPLATE = "/action/feed/import-sqoop-database-action.xml";
+    protected static final String IMPORT_ACTION_NAME="db-import-sqoop";
+
+    private static final String ARG_SEPARATOR = " ";
+
+    public DatabaseImportWorkflowBuilder(Feed entity) { super(entity); }
+
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.IMPORT;
+    }
+
+    @Override
+    protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException {
+
+        addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
+
+        ACTION sqoopImport = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE);
+        // delete addHDFSServersConfig(sqoopImport, src, target);
+        addTransition(sqoopImport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(sqoopImport);
+
+        //Add post-processing actions
+        ACTION success = getSuccessPostProcessAction();
+        // delete addHDFSServersConfig(success, src, target);
+        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = getFailPostProcessAction();
+        // delete addHDFSServersConfig(fail, src, target);
+        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(fail);
+
+        decorateWorkflow(workflow, workflow.getName(), IMPORT_ACTION_NAME);
+        addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
+
+        // build the sqoop command and put it in the properties
+        String sqoopCmd = buildSqoopCommand(cluster, entity);
+        LOG.info("SQOOP COMMAND : " + sqoopCmd);
+        Properties props = new Properties();
+        props.put("sqoopCommand", sqoopCmd);
+        return props;
+    }
+
+    private String buildSqoopCommand(Cluster cluster, Feed feed) throws FalconException {
+        Map<String, String> extraArgs = getArguments(cluster);
+        StringBuilder sqoopCmd = new StringBuilder();
+        sqoopCmd.append("import").append(ARG_SEPARATOR);
+        buildDriverArgs(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildConnectArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildTableArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildUserPasswordArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildNumMappers(sqoopCmd, extraArgs).append(ARG_SEPARATOR);
+        buildArguments(sqoopCmd, extraArgs).append(ARG_SEPARATOR);
+        buildTargetDirArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        return sqoopCmd.toString();
+    }
+
+    private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        Datasource db = DatasourceHelper.getDatasource(feedCluster);
+        if ((db.getDriver() != null) && (db.getDriver().getClazz() != null)) {
+            builder.append("--driver").append(ARG_SEPARATOR).append(db.getDriver().getClazz());
+        }
+        return builder;
+    }
+
+    private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        return builder.append("--connect").append(ARG_SEPARATOR)
+                .append(DatasourceHelper.getReadOnlyEndpoint(DatasourceHelper.getDatasource(feedCluster)));
+    }
+
+    private StringBuilder buildTableArg(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        return builder.append("--table").append(ARG_SEPARATOR)
+                                    .append(FeedHelper.getImportDataSourceTableName(feedCluster));
+    }
+
+    private StringBuilder buildUserPasswordArg(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        Datasource db = DatasourceHelper.getDatasource(feedCluster);
+        Pair<String, String> userPasswdInfo = DatasourceHelper.getReadPasswordInfo(db);
+        builder.append("--username").append(ARG_SEPARATOR)
+                .append(userPasswdInfo.first)
+                .append(ARG_SEPARATOR);
+        if (DatasourceHelper.getReadPasswordType(db) == Credentialtype.PASSWORD_FILE) {
+            builder.append("--password-file");
+        } else {
+            builder.append("--password");
+        }
+        builder.append(ARG_SEPARATOR).append(userPasswdInfo.second);
+        return builder;
+    }
+
+    private StringBuilder buildTargetDirArg(StringBuilder builder, Cluster cluster)
+        throws FalconException {
+        return builder.append("--delete-target-dir").append(ARG_SEPARATOR)
+                .append("--target-dir").append(ARG_SEPARATOR)
+                .append(String.format("${coord:dataOut('%s')}",
+                        FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
+    }
+
+    private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs)
+        throws FalconException {
+        for(Map.Entry<String, String> e : extraArgs.entrySet()) {
+            builder.append(e.getKey()).append(ARG_SEPARATOR).append(e.getValue()).append(ARG_SEPARATOR);
+        }
+        return builder;
+    }
+
+    /**
+     *
+     * Feed validation checks to make sure --split-by column is supplied when --num-mappers > 1
+     * if --num-mappers is not specified, set it to 1.
+     *
+     * @param builder contains command
+     * @param extraArgs map of extra arguments
+     * @return command string
+     */
+
+    private StringBuilder buildNumMappers(StringBuilder builder, Map<String, String> extraArgs) {
+        if (!extraArgs.containsKey("--num-mappers")) {
+            builder.append("--num-mappers").append(ARG_SEPARATOR).append(1);
+        }
+        return builder;
+    }
+
+    private Map<String, String> getArguments(Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        return FeedHelper.getImportArguments(feedCluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java
new file mode 100644
index 0000000..70289d0
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds Oozie coordinator for database import.
+ */
+
+public class FeedImportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
+    public FeedImportCoordinatorBuilder(Feed entity) {
+        super(entity, LifeCycle.IMPORT);
+    }
+
+    public static final String IMPORT_DATASET_NAME = "import-dataset";
+
+    public static final String IMPORT_DATAOUT_NAME = "import-output";
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedImportCoordinatorBuilder.class);
+
+
+    @Override
+    public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster((Feed) entity, cluster.getName());
+        if (!FeedHelper.isImportEnabled(feedCluster)) {
+            return null;
+        }
+
+        if (feedCluster.getValidity().getEnd().before(new Date())) {
+            LOG.warn("Feed IMPORT is not applicable as Feed's end time for cluster {} is not in the future",
+                    cluster.getName());
+            return null;
+        }
+
+        COORDINATORAPP coord = new COORDINATORAPP();
+        initializeCoordAttributes(coord, (Feed) entity, cluster);
+        Properties props = createCoordDefaultConfiguration(getEntityName());
+        initializeOutputPath(coord, cluster, props);
+
+        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+
+        WORKFLOW workflow = new WORKFLOW();
+        Path coordPath = getBuildPath(buildPath);
+        Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.IMPORT).build(cluster,
+                coordPath);
+        workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        props.putAll(wfProp);
+        workflow.setConfiguration(getConfig(props));
+        ACTION action = new ACTION();
+        action.setWorkflow(workflow);
+
+        coord.setAction(action);
+
+        Path marshalPath = marshal(cluster, coord, coordPath);
+        return Arrays.asList(getProperties(marshalPath, getEntityName()));
+    }
+
+    private void initializeOutputPath(COORDINATORAPP coord, Cluster cluster, Properties props)
+        throws FalconException {
+
+        if (coord.getDatasets() == null) {
+            coord.setDatasets(new DATASETS());
+        }
+
+        if (coord.getOutputEvents() == null) {
+            coord.setOutputEvents(new OUTPUTEVENTS());
+        }
+
+        Storage storage = FeedHelper.createStorage(cluster, (Feed) entity);
+        SYNCDATASET syncdataset = createDataSet((Feed) entity, cluster, storage,
+                IMPORT_DATASET_NAME, LocationType.DATA);
+
+        if (syncdataset == null) {
+            return;
+        }
+        coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+        DATAOUT dataout = createDataOut(entity);
+        coord.getOutputEvents().getDataOut().add(dataout);
+    }
+
+    private DATAOUT createDataOut(Feed feed) {
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(IMPORT_DATAOUT_NAME);
+        dataout.setDataset(IMPORT_DATASET_NAME);
+        dataout.setInstance("${coord:current(0)}");
+        return dataout;
+    }
+
+    /**
+     * Create DataSet. The start instance is set to current date if the merge type is snapshot.
+     * Otherwise, the Feed cluster start data will be used as start instance.
+     *
+     * @param feed
+     * @param cluster
+     * @param storage
+     * @param datasetName
+     * @param locationType
+     * @return
+     * @throws FalconException
+     */
+    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+                                      String datasetName, LocationType locationType) throws FalconException {
+        SYNCDATASET syncdataset = new SYNCDATASET();
+        syncdataset.setName(datasetName);
+        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+        String uriTemplate = storage.getUriTemplate(locationType);
+        if (StringUtils.isBlank(uriTemplate)) {
+            return null;
+        }
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        syncdataset.setUriTemplate(uriTemplate);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date initialInstance = FeedHelper.getImportInitalInstance(feedCluster);
+        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(initialInstance));
+        syncdataset.setTimezone(feed.getTimezone().getID());
+
+        if (StringUtils.isNotBlank(feed.getAvailabilityFlag())) {
+            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
+        } else {
+            syncdataset.setDoneFlag("");
+        }
+
+        return syncdataset;
+    }
+
+    /**
+     * Initialize the coordinator with current data as start if the merge type is snapshot.
+     * Otherwise, use the feed cluster validate as the coordinator start date.
+     *
+     * @param coord
+     * @param feed
+     * @param cluster
+     */
+
+    private void initializeCoordAttributes(COORDINATORAPP coord, Feed feed, Cluster cluster) {
+        coord.setName(getEntityName());
+        // for feeds with snapshot layout, the start date will be the time of scheduling since it dumps whole table
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date initialInstance = FeedHelper.getImportInitalInstance(feedCluster);
+        coord.setStart(SchemaHelper.formatDateUTC(initialInstance));
+        coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+        coord.setTimezone(entity.getTimezone().getID());
+        coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
new file mode 100644
index 0000000..4892ecb
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Properties;
+
+/**
+ * Builds oozie workflow for Datasource import.
+ */
+
+public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+
+    public ImportWorkflowBuilder(Feed feed) {
+        super(feed, LifeCycle.IMPORT);
+    }
+
+    @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+
+        WORKFLOWAPP workflow = new WORKFLOWAPP();
+        String wfName = EntityUtil.getWorkflowName(Tag.IMPORT, entity).toString();
+        workflow.setName(wfName);
+        Properties p = getWorkflow(cluster, workflow);
+        marshal(cluster, workflow, buildPath);
+
+        Properties props = FeedHelper.getFeedProperties(entity);
+        if (props == null) {
+            props = new Properties();
+        }
+        props.putAll(getProperties(buildPath, wfName));
+        if (createDefaultConfiguration(cluster) != null) {
+            props.putAll(createDefaultConfiguration(cluster));
+        }
+        if (FeedHelper.getUserWorkflowProperties(getLifecycle()) != null) {
+            props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+        }
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
+                String.format("${coord:dataOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
+        props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE);
+        props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), NONE);
+        props.setProperty("srcClusterName", "NA");
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+
+        if (StringUtils.isEmpty(FeedHelper.getImportDatasourceName(
+            FeedHelper.getCluster(entity, cluster.getName())))) {
+            throw new FalconException("Datasource name is null or empty");
+        }
+
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(),
+            FeedHelper.getImportDatasourceName(FeedHelper.getCluster(entity, cluster.getName())));
+        props.putAll(p);
+        return props;
+    }
+
+    protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index a04ae95..55e07be 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -75,6 +75,9 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
             case REPLICATION:
                 return new FeedReplicationCoordinatorBuilder((Feed)entity);
 
+            case IMPORT:
+                return new FeedImportCoordinatorBuilder((Feed)entity);
+
             default:
                 throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + tag);
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 3213a70..026f79f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.HiveUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -131,6 +132,17 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
                     return new FSReplicationWorkflowBuilder(feed);
                 }
 
+            case IMPORT:
+                DatasourceType dsType = EntityUtil.getImportDatasourceType(cluster, feed);
+                if ((dsType == DatasourceType.MYSQL)
+                    || (dsType == DatasourceType.ORACLE)
+                    || (dsType == DatasourceType.HSQL)) {
+                    return new DatabaseImportWorkflowBuilder(feed);
+                } else {
+                    LOG.info("Import policy not implemented for DataSourceType : " + dsType);
+                }
+                break;
+
             default:
                 throw new IllegalArgumentException("Unhandled type " + entity.getEntityType()
                        + ", lifecycle " + lifecycle);

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 0dc09ee..e45dfc5 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 
 import java.util.Arrays;
 import java.util.Properties;
@@ -82,7 +83,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
         } else {
             props.put("availabilityFlag", entity.getAvailabilityFlag());
         }
-
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index 9e55edf..9fbc5b2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -72,6 +72,11 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
             props.addAll(replicationProps);
         }
 
+        List<Properties> importProps = OozieCoordinatorBuilder.get(entity, Tag.IMPORT).buildCoords(cluster, buildPath);
+        if (importProps != null) {
+            props.addAll(importProps);
+        }
+
         if (!props.isEmpty()) {
             copySharedLibs(cluster, new Path(getLibPath(buildPath)));
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index f953557..b9e3848 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -110,6 +110,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
 
         props.put("falconInputFeeds", entity.getName());
         props.put("falconInPaths", IGNORE);
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 6e2a631..629485d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 
 import javax.xml.bind.JAXBElement;
 import java.util.Arrays;
@@ -153,7 +154,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
     protected Properties getWorkflowProperties(Feed feed) throws FalconException {
         Properties props = super.getWorkflowProperties(feed);
         props.put("availabilityFlag", "NA");
-
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 3f49adb..dc9349f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -40,6 +40,7 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -129,6 +130,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         Properties props = new Properties();
         props.setProperty("srcClusterName", "NA");
         props.setProperty("availabilityFlag", "NA");
+        props.setProperty(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }
 


Mime
View raw message