falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sowmya...@apache.org
Subject [3/3] falcon git commit: FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran
Date Thu, 29 Oct 2015 01:08:40 GMT
FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/89040a29
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/89040a29
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/89040a29

Branch: refs/heads/master
Commit: 89040a296de3d4a9bd0aa2232342438add37afee
Parents: 35006fe
Author: Sowmya Ramesh <sramesh@hortonworks.com>
Authored: Wed Oct 28 18:08:31 2015 -0700
Committer: Sowmya Ramesh <sramesh@hortonworks.com>
Committed: Wed Oct 28 18:08:31 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../main/java/org/apache/falcon/LifeCycle.java  |   3 +-
 client/src/main/java/org/apache/falcon/Tag.java |   2 +-
 .../org/apache/falcon/entity/v0/EntityType.java |   8 +-
 .../falcon/metadata/RelationshipType.java       |   2 +
 client/src/main/resources/datasource-0.1.xsd    | 256 ++++++++++++++++++
 client/src/main/resources/feed-0.1.xsd          |  93 ++++++-
 client/src/main/resources/jaxb-binding.xjb      |   9 +
 client/src/main/resources/mysql_database.xml    |  46 ++++
 .../apache/falcon/entity/DatasourceHelper.java  | 199 ++++++++++++++
 .../org/apache/falcon/entity/EntityUtil.java    |  21 +-
 .../org/apache/falcon/entity/FeedHelper.java    | 191 +++++++++++++-
 .../entity/parser/DatasourceEntityParser.java   | 127 +++++++++
 .../entity/parser/EntityParserFactory.java      |   2 +
 .../falcon/entity/parser/FeedEntityParser.java  |  64 ++++-
 .../falcon/entity/store/ConfigurationStore.java |   2 +-
 .../apache/falcon/entity/v0/EntityGraph.java    |  11 +
 .../entity/v0/EntityIntegrityChecker.java       |   3 +
 .../EntityRelationshipGraphBuilder.java         |  33 +++
 .../InstanceRelationshipGraphBuilder.java       |  33 +++
 .../falcon/metadata/MetadataMappingService.java |   9 +-
 .../falcon/metadata/RelationshipLabel.java      |   1 +
 .../org/apache/falcon/util/HdfsClassLoader.java | 159 +++++++++++
 .../falcon/workflow/WorkflowExecutionArgs.java  |   1 +
 .../workflow/WorkflowExecutionContext.java      |   6 +-
 .../apache/falcon/entity/AbstractTestBase.java  |   1 +
 .../apache/falcon/entity/EntityTypeTest.java    |   3 +
 .../apache/falcon/entity/FeedHelperTest.java    | 109 +++++++-
 .../parser/DatasourceEntityParserTest.java      |  77 ++++++
 .../entity/parser/FeedEntityParserTest.java     | 159 ++++++++++-
 .../falcon/entity/v0/EntityGraphTest.java       | 124 ++++++++-
 .../config/datasource/datasource-0.1.xml        |  48 ++++
 .../config/datasource/datasource-file-0.1.xml   |  48 ++++
 .../datasource/datasource-invalid-0.1.xml       |  46 ++++
 .../resources/config/feed/feed-import-0.1.xml   |  74 ++++++
 .../feed/feed-import-exclude-fields-0.1.xml     |  74 ++++++
 .../config/feed/feed-import-invalid-0.1.xml     |  73 +++++
 .../config/feed/feed-import-noargs-0.1.xml      |  64 +++++
 docs/src/site/twiki/EntitySpecification.twiki   |  84 ++++++
 docs/src/site/twiki/FalconCLI.twiki             |  23 +-
 .../falcon/messaging/JMSMessageProducer.java    |   3 +-
 .../oozie/DatabaseImportWorkflowBuilder.java    | 174 ++++++++++++
 .../oozie/FeedImportCoordinatorBuilder.java     | 191 ++++++++++++++
 .../falcon/oozie/ImportWorkflowBuilder.java     |  84 ++++++
 .../falcon/oozie/OozieCoordinatorBuilder.java   |   3 +
 .../OozieOrchestrationWorkflowBuilder.java      |  12 +
 .../feed/FSReplicationWorkflowBuilder.java      |   3 +-
 .../falcon/oozie/feed/FeedBundleBuilder.java    |   5 +
 .../feed/FeedRetentionWorkflowBuilder.java      |   1 +
 .../feed/HCatReplicationWorkflowBuilder.java    |   3 +-
 .../ProcessExecutionWorkflowBuilder.java        |   2 +
 .../feed/import-sqoop-database-action.xml       |  47 ++++
 .../src/main/resources/action/post-process.xml  |   2 +
 pom.xml                                         |   3 +
 webapp/pom.xml                                  |   3 +
 .../apache/falcon/lifecycle/FeedImportIT.java   |  99 +++++++
 .../org/apache/falcon/resource/TestContext.java |   3 +
 .../org/apache/falcon/util/HsqldbTestUtils.java | 263 +++++++++++++++++++
 .../src/test/resources/datasource-template.xml  |  46 ++++
 webapp/src/test/resources/feed-template3.xml    |  59 +++++
 60 files changed, 3253 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1f30cf..b5980be 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1459 Ability to import from database(Venkat Ramachandran via Sowmya Ramesh)
+
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/LifeCycle.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/LifeCycle.java b/client/src/main/java/org/apache/falcon/LifeCycle.java
index 58a2a6c..d4d39e8 100644
--- a/client/src/main/java/org/apache/falcon/LifeCycle.java
+++ b/client/src/main/java/org/apache/falcon/LifeCycle.java
@@ -25,7 +25,8 @@ package org.apache.falcon;
 public enum LifeCycle {
     EXECUTION(Tag.DEFAULT),
     EVICTION(Tag.RETENTION),
-    REPLICATION(Tag.REPLICATION);
+    REPLICATION(Tag.REPLICATION),
+    IMPORT(Tag.IMPORT);
 
     private final Tag tag;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/Tag.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/Tag.java b/client/src/main/java/org/apache/falcon/Tag.java
index beeb812..5027ac0 100644
--- a/client/src/main/java/org/apache/falcon/Tag.java
+++ b/client/src/main/java/org/apache/falcon/Tag.java
@@ -24,7 +24,7 @@ import org.apache.falcon.entity.v0.EntityType;
  * Tag to include in the entity type.
  */
 public enum Tag {
-    DEFAULT(EntityType.PROCESS), RETENTION(EntityType.FEED), REPLICATION(EntityType.FEED);
+    DEFAULT(EntityType.PROCESS), RETENTION(EntityType.FEED), REPLICATION(EntityType.FEED), IMPORT(EntityType.FEED);
 
     private final EntityType entityType;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
index 0657124..3d55547 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
@@ -21,6 +21,7 @@ package org.apache.falcon.entity.v0;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
@@ -38,7 +39,8 @@ import java.util.Arrays;
 public enum EntityType {
     FEED(Feed.class, "/feed-0.1.xsd", "name"),
     PROCESS(Process.class, "/process-0.1.xsd", "name"),
-    CLUSTER(Cluster.class, "/cluster-0.1.xsd", "name");
+    CLUSTER(Cluster.class, "/cluster-0.1.xsd", "name"),
+    DATASOURCE(Datasource.class, "/datasource-0.1.xsd", "name");
 
     //Fail unmarshalling of whole xml if unmarshalling of any element fails
     private static class EventHandler implements ValidationEventHandler {
@@ -93,8 +95,10 @@ public enum EntityType {
         return unmarshaller;
     }
 
+
     public boolean isSchedulable() {
-        return this != EntityType.CLUSTER;
+        // Cluster and Datasource are not schedulable like Feed and Process
+        return ((this != EntityType.CLUSTER) && (this != EntityType.DATASOURCE));
     }
 
     @edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP"})

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
index f034772..8e5f8ea 100644
--- a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
+++ b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
@@ -27,10 +27,12 @@ public enum RelationshipType {
     CLUSTER_ENTITY("cluster-entity"),
     FEED_ENTITY("feed-entity"),
     PROCESS_ENTITY("process-entity"),
+    DATASOURCE_ENTITY("datasource-entity"),
 
     // instance vertex types
     FEED_INSTANCE("feed-instance"),
     PROCESS_INSTANCE("process-instance"),
+    IMPORT_INSTANCE("import-instance"),
 
     // Misc vertex types
     USER("user"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/datasource-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/datasource-0.1.xsd b/client/src/main/resources/datasource-0.1.xsd
new file mode 100644
index 0000000..beb82cc
--- /dev/null
+++ b/client/src/main/resources/datasource-0.1.xsd
@@ -0,0 +1,256 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" attributeFormDefault="unqualified" elementFormDefault="qualified"
+           targetNamespace="uri:falcon:datasource:0.1" xmlns="uri:falcon:datasource:0.1"
+           xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" jaxb:version="2.1">
+    <xs:annotation>
+        <xs:documentation>
+            Licensed to the Apache Software Foundation (ASF) under one or more
+            contributor license agreements. See the NOTICE file distributed with
+            this work for additional information regarding copyright ownership.
+            The ASF licenses this file to You under the Apache License, Version
+            2.0
+            (the "License"); you may not use this file except in compliance with
+            the License. You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+            Unless required by applicable law or agreed to in writing, software
+            distributed under the License is distributed on an "AS IS" BASIS,
+            WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+            implied.
+            See the License for the specific language governing permissions and
+            limitations under the License.
+        </xs:documentation>
+        <xs:appinfo>
+            <jaxb:schemaBindings>
+                <jaxb:package name="org.apache.falcon.entity.v0.datasource"/>
+            </jaxb:schemaBindings>
+        </xs:appinfo>
+    </xs:annotation>
+    <xs:element name="datasource" type="datasource">
+    </xs:element>
+    <xs:complexType name="datasource">
+        <xs:annotation>
+            <xs:documentation>The datasource contains various information required
+                to connect to a data source like a MySQL datasource or Kafka cluster.
+                A datasource is referenced by feeds that represent an object like
+                Table (or Topic) in the MySQL database (or Kafka Cluster).
+                name: the name of datasource, which must be unique.
+                colo: the name of the colo to which this datasource belongs to.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="KEY_VALUE_PAIR" name="tags" minOccurs="0" maxOccurs="1">
+                <xs:annotation>
+                    <xs:documentation>
+                        tags: a process specifies an optional list of comma separated tags,
+                        Key Value Pairs, separated by comma,
+                        which is used for classification of datasource entity.
+                        Example: consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+            <xs:element type="interfaces" name="interfaces"/>
+            <xs:element type="driver" name="driver" minOccurs="1" maxOccurs="1" />
+            <xs:element type="properties" name="properties" minOccurs="0"/>
+            <xs:element type="ACL" name="ACL" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute type="IDENTIFIER" name="name" use="required"/>
+        <xs:attribute type="xs:string"  name="colo" use="required"/>
+        <xs:attribute type="xs:string"  name="description"/>
+        <xs:attribute type="datasource-type"  name="type" use="required">
+            <xs:annotation>
+                <xs:documentation>
+                    datasource type could be Relational Databases (MySQL, Oracle etc.), Messgaing systems like
+                    Kafka, etc.
+                </xs:documentation>
+            </xs:annotation>
+        </xs:attribute>
+    </xs:complexType>
+    <xs:complexType name="property">
+        <xs:annotation>
+            <xs:documentation>
+                A key-value pair to pass in any datasource specific properties.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="name" use="required"/>
+        <xs:attribute type="xs:string" name="value" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="interface">
+        <xs:annotation>
+            <xs:documentation>
+                An interface specifies the interface type (read or write), and an
+                endpoint url. Falcon uses these endpoints to import or export
+                data from datasources.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="driver" name="driver" minOccurs="0" maxOccurs="1" />
+            <xs:element type="credential" name="credential" minOccurs="0" maxOccurs="1"/>
+            <xs:element type="properties" name="properties" minOccurs="0"/>
+        </xs:sequence>
+        <xs:attribute type="interfacetype" name="type" use="required"/>
+        <xs:attribute type="xs:string" name="endpoint" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="properties">
+        <xs:annotation>
+            <xs:documentation>
+                A list of property elements.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="property" name="property" maxOccurs="unbounded" minOccurs="0"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="interfaces">
+        <xs:annotation>
+            <xs:documentation>
+                A list of interfaces.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="interface" name="interface" maxOccurs="2" minOccurs="1"/>
+            <xs:element type="credential" name="credential" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:simpleType name="interfacetype">
+        <xs:annotation>
+            <xs:documentation>
+                An interface for datasource has 2 different interface types: readonly, write.
+                The readonly endpoint specifies the url/mechanism to use for data IMPORT operation
+                from a datasource while write endpoint specifies the url/mechanism to use for data
+                EXPORT operatrion.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="readonly"/>
+            <xs:enumeration value="write"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:simpleType name="IDENTIFIER">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="(([a-zA-Z]([\-a-zA-Z0-9])*){1,39})"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:simpleType name="KEY_VALUE_PAIR">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="([\w_]+=[^,]+)?([,]?[ ]*[\w_]+=[^,]+)*"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:complexType name="credential">
+        <xs:sequence  minOccurs="1" maxOccurs="1" >
+            <xs:element name="userName" minOccurs="1" maxOccurs="1" type="xs:string">
+                <xs:annotation>
+                    <xs:documentation>
+                        The User for the datasource.
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+
+            <xs:choice minOccurs="1" maxOccurs="1">
+                <xs:element name="passwordFile" type="xs:string">
+                    <xs:annotation>
+                        <xs:documentation>
+                            The FQ path to a file on HDFS containing the datasource
+                            server password with 400 permissions. Only the user
+                            submitting the job has read access to this file which
+                            will be securely passed to the mappers.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+
+                <xs:element name="passwordText" type="xs:string">
+                <xs:annotation>
+                        <xs:documentation>
+                            Plain text password.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+            </xs:choice>
+        </xs:sequence>
+        <xs:attribute name="type" type="credentialtype" use="required"/>
+    </xs:complexType>
+
+    <xs:simpleType name="credentialtype">
+        <xs:annotation>
+            <xs:documentation>
+                user-password credentials are supported today which can be extended.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="password-file" />
+            <xs:enumeration value="password-text" />
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:simpleType name="datasource-type">
+        <xs:annotation>
+            <xs:documentation>
+                The datasource type can be MySQL, ORACLE, Teradata etc.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="mysql"/>
+            <xs:enumeration value="oracle"/>
+            <xs:enumeration value="hsql"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:complexType name="driver">
+        <xs:annotation>
+            <xs:documentation>
+                Driver information.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence minOccurs="1" maxOccurs="1">
+            <xs:element type="xs:string" name="clazz" minOccurs="1" maxOccurs="1">
+                <xs:annotation>
+                    <xs:documentation>
+                        Fully qualified class name for the datasource driver used
+                        for validating the datasource connection in Falcon.
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+            <xs:element type="xs:string" name="jar" minOccurs="1" maxOccurs="unbounded">
+                <xs:annotation>
+                    <xs:documentation>
+                        Path to the connector jar files on HDFS thats shipped with the workflow.
+                        You'd need to put the connector jar files in oozie sharelib and since this
+                        is using all the latest features in sqoop 1.x, requires 1.5 snapshot.
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="ACL">
+        <xs:annotation>
+            <xs:documentation>
+                Access control list for this cluster.
+                owner is the Owner of this entity.
+                group is the one which has access to read - not used at this time.
+                permission is not enforced at this time
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="owner"/>
+        <xs:attribute type="xs:string" name="group"/>
+        <xs:attribute type="xs:string" name="permission" default="*"/>
+    </xs:complexType>
+</xs:schema>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 77b8f4b..2974dd6 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -130,7 +130,6 @@
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:string" name="description"/>
     </xs:complexType>
-
     <xs:complexType name="cluster">
         <xs:annotation>
             <xs:documentation>
@@ -157,6 +156,7 @@
             <xs:element type="validity" name="validity"/>
             <xs:element type="retention" name="retention"/>
             <xs:element type="sla" name="sla" minOccurs="0" maxOccurs="1"/>
+            <xs:element type="import" name="import" minOccurs="0" maxOccurs="1"/>
             <xs:choice minOccurs="0" maxOccurs="1">
                 <xs:element type="locations" name="locations" minOccurs="0"/>
                 <xs:element type="catalog-table" name="table"/>
@@ -166,8 +166,7 @@
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="cluster-type" name="type" use="optional"/>
         <xs:attribute type="xs:string" name="partition" use="optional"/>
-        <xs:attribute type="frequency-type" name="delay" use="optional" /> 
-
+        <xs:attribute type="frequency-type" name="delay" use="optional" />
     </xs:complexType>
     <xs:complexType name="partitions">
         <xs:annotation>
@@ -301,7 +300,6 @@
     <xs:complexType name="partition">
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
     </xs:complexType>
-
     <xs:complexType name="notification">
         <xs:annotation>
             <xs:documentation>
@@ -331,7 +329,6 @@
         </xs:attribute>
         <xs:attribute type="xs:string" name="to" use="required"/>
     </xs:complexType>
-
     <xs:complexType name="ACL">
         <xs:annotation>
             <xs:documentation>
@@ -451,7 +448,90 @@
             <xs:minLength value="1"/>
         </xs:restriction>
     </xs:simpleType>
-
+    <xs:complexType name="import">
+       <xs:sequence>
+            <xs:element type="source" name="source"/>
+            <xs:element type="arguments" name="arguments" minOccurs="0"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="source">
+        <xs:annotation>
+            <xs:documentation>
+                Specifies the source entity name from which data will be imported.
+                This can be Database or other data source types in the future.
+                Table name specifies the table to import.
+                Extract type specifies a extraction method (full or incremental).
+                DeltaColumn specifies the column name on source databbase table
+                to identify the new data since the last extraction.
+                Merge type specifies how the data will be organized on Hadoop.
+                The supported types are snapshot (as in a particular time) or append
+                (as in timeseries partitions).
+            </xs:documentation>
+        </xs:annotation>
+       <xs:sequence>
+            <xs:element type="extract" name="extract" minOccurs="1"/>
+            <xs:element type="fields-type" name="fields" minOccurs="0"/>
+        </xs:sequence>
+        <xs:attribute type="non-empty-string" name="name" use="required"/>
+        <xs:attribute type="non-empty-string" name="tableName" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="extract">
+        <xs:sequence>
+            <xs:element type="xs:string" name="deltacolumn" minOccurs="0" maxOccurs="1"/>
+            <xs:element type="merge-type" name="mergepolicy" minOccurs="1" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute type="extract-method" name="type" use="required"/>
+    </xs:complexType>
+    <xs:simpleType name="extract-method">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="full"/>
+            <xs:enumeration value="incremental"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:simpleType name="merge-type">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="snapshot"/>
+            <xs:enumeration value="append"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:complexType name="fields-type">
+        <xs:annotation>
+            <xs:documentation>
+                Specifies either an include or exclude fields list. If include field list is specified, only
+                the specified fields will be imported. If exclude field list is specified, all fields except
+                the ones specified will be imported from datasource to HDFS.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:choice minOccurs="1" maxOccurs="1">
+            <xs:element type="field-include-exclude" name="includes"/>
+            <xs:element type="field-include-exclude" name="excludes"/>
+        </xs:choice>
+    </xs:complexType>
+    <xs:complexType name="field-include-exclude">
+        <xs:sequence>
+            <xs:element type="xs:string" name="field" maxOccurs="unbounded" minOccurs="1"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="arguments">
+        <xs:annotation>
+            <xs:documentation>
+                A list of name-value pair of extra arguments to be passed to the concrete implementation.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="argument" name="argument" maxOccurs="unbounded" minOccurs="0"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="argument">
+        <xs:annotation>
+            <xs:documentation>
+                A key-value pair, which are used while invoking
+                ingestion engines.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="name" use="required"/>
+        <xs:attribute type="xs:string" name="value" use="required"/>
+    </xs:complexType>
     <xs:complexType name="retention-stage">
         <xs:annotation>
             <xs:documentation>
@@ -469,5 +549,4 @@
             <xs:element type="properties" name="properties" minOccurs="0" maxOccurs="1"></xs:element>
         </xs:all>
     </xs:complexType>
-
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb
index 6f1d6c7..978145f 100644
--- a/client/src/main/resources/jaxb-binding.xjb
+++ b/client/src/main/resources/jaxb-binding.xjb
@@ -56,6 +56,15 @@
         <inheritance:extends>org.apache.falcon.entity.v0.EntityNotification</inheritance:extends>
     </jaxb:bindings>
 
+
+    <jaxb:bindings schemaLocation="datasource-0.1.xsd" node="//xs:complexType[@name='datasource']">
+         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
+    </jaxb:bindings>
+
+    <jaxb:bindings schemaLocation="datasource-0.1.xsd" node="//xs:complexType[@name='ACL']">
+        <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends>
+    </jaxb:bindings>
+
     <jaxb:globalBindings>
         <xjc:simple/>
     </jaxb:globalBindings>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/mysql_database.xml
----------------------------------------------------------------------
diff --git a/client/src/main/resources/mysql_database.xml b/client/src/main/resources/mysql_database.xml
new file mode 100644
index 0000000..5f88ba4
--- /dev/null
+++ b/client/src/main/resources/mysql_database.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<database colo="west-coast" description="MySQL database on west coast" type="mysql" name="mysql-db" xmlns="uri:falcon:database:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+
+        <!-- ***** read interface ***** -->
+        <interface type="readonly" endpoint="jdbc:mysql://c6402/test">
+            <credential type="password-file">
+                <userName>sqoop_user</userName>
+                <passwordFile>/user/ambari-qa/password-store/password_read_user</passwordFile>
+            </credential>
+        </interface>
+
+        <!-- ***** write interface ***** -->
+        <interface type="write"  endpoint="jdbc:mysql://c6402/test">
+            <credential type="password-file">
+                <userName>sqoop2_user</userName>
+                <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile>
+            </credential>
+        </interface>
+
+        <!-- ***** default credential ***** -->
+        <credential type="password-file">
+            <userName>sqoop2_user</userName>
+            <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile>
+        </credential>
+
+    </interfaces>
+</database>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
new file mode 100644
index 0000000..f9b3966
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.datasource.Credential;
+import org.apache.falcon.entity.v0.datasource.Credentialtype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
+import org.apache.falcon.entity.v0.datasource.Interface;
+import org.apache.falcon.entity.v0.datasource.Interfaces;
+import org.apache.falcon.entity.v0.datasource.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+
+/**
+ * DataSource entity helper methods.
+ */
+
+public final class DatasourceHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DatasourceHelper.class);
+
+    private static final ConfigurationStore STORE = ConfigurationStore.get();
+
+    public static DatasourceType getImportSourceType(Cluster feedCluster) throws FalconException {
+        Datasource ds = STORE.get(EntityType.DATASOURCE, feedCluster.getImport().getSource().getName());
+        return ds.getType();
+    }
+
+    private DatasourceHelper() {}
+
+    public static Datasource getDatasource(Cluster feedCluster) throws FalconException {
+        return STORE.get(EntityType.DATASOURCE, feedCluster.getImport().getSource().getName());
+    }
+    public static String getReadOnlyEndpoint(Datasource db) {
+        return getInterface(db, Interfacetype.READONLY);
+    }
+
+    /**
+     * Returns user name and password pair as it is specified in the XML. If the credential type is
+     * password-file, the path name is returned.
+     *
+     * @param db
+     * @return user name and password pair
+     * @throws FalconException
+     */
+    public static Pair<String, String> getReadPasswordInfo(Datasource db) throws FalconException {
+        for (Interface ifs : db.getInterfaces().getInterfaces()) {
+            if ((ifs.getType() == Interfacetype.READONLY) && (ifs.getCredential() != null)) {
+                return getPasswordInfo(ifs.getCredential());
+            }
+        }
+        return getDefaultPasswordInfo(db.getInterfaces());
+    }
+
+    /**
+     * Returns user name and actual password pair. If the credential type is password-file, then the
+     * password is read from the HDFS file. If the credential type is password-text, the clear text
+     * password is returned.
+     *
+     * @param db
+     * @return
+     * @throws FalconException
+     */
+    public static java.util.Properties fetchReadPasswordInfo(Datasource db) throws FalconException {
+        Pair<String, String> passwdInfo = getReadPasswordInfo(db);
+        java.util.Properties p = new java.util.Properties();
+        p.put("user", passwdInfo.first);
+        p.put("password", passwdInfo.second);
+        if (getReadPasswordType(db) == Credentialtype.PASSWORD_FILE) {
+            String actualPasswd = readPasswordInfoFromFile(passwdInfo.second);
+            p.put("password", actualPasswd);
+        }
+        return p;
+    }
+
+    /**
+     * Given Datasource, return the read-only credential type. If read-only credential is missing,
+     * use interface's default credential.
+     *
+     * @param db
+     * @return Credentialtype
+     * @throws FalconException
+     */
+    public static Credentialtype getReadPasswordType(Datasource db) throws FalconException {
+        for (Interface ifs : db.getInterfaces().getInterfaces()) {
+            if ((ifs.getType() == Interfacetype.READONLY) && (ifs.getCredential() != null)) {
+                return getPasswordType(ifs.getCredential());
+            }
+        }
+        return getDefaultPasswordType(db.getInterfaces());
+    }
+
+    /**
+     * Return the Interface endpoint for the interface type specified in the argument.
+     *
+     * @param db
+     * @param type - can be read-only or write
+     * @return
+     */
+    private static String getInterface(Datasource db, Interfacetype type) {
+        for(Interface ifs : db.getInterfaces().getInterfaces()) {
+            if (ifs.getType() == type) {
+                return ifs.getEndpoint();
+            }
+        }
+        return null;
+    }
+    private static Credentialtype getPasswordType(Credential c) {
+        return c.getType();
+    }
+
+    private static Credentialtype getDefaultPasswordType(Interfaces ifs) throws FalconException {
+
+        if (ifs.getCredential() != null) {
+            return ifs.getCredential().getType();
+        } else {
+            throw new FalconException("Missing Interfaces default credential");
+        }
+    }
+
+    private static Pair<String, String> getDefaultPasswordInfo(Interfaces ifs) throws FalconException {
+
+        if (ifs.getCredential() != null) {
+            return getPasswordInfo(ifs.getCredential());
+        } else {
+            throw new FalconException("Missing Interfaces default credential");
+        }
+    }
+
+    private static Pair<String, String> getPasswordInfo(Credential c) throws FalconException {
+        String passwd = null;
+        if (c.getType() == Credentialtype.PASSWORD_FILE) {
+            passwd = c.getPasswordFile();
+        } else {
+            passwd = c.getPasswordText();
+        }
+        return new Pair<String, String>(c.getUserName(), passwd);
+    }
+
+    private static String readPasswordInfoFromFile(String passwordFilePath) throws FalconException {
+        try {
+            Path path = new Path(passwordFilePath);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
+            if (!fs.exists(path)) {
+                throw new IOException("The password file does not exist! "
+                        + passwordFilePath);
+            }
+
+            if (!fs.isFile(path)) {
+                throw new IOException("The password file cannot be a directory! "
+                        + passwordFilePath);
+            }
+
+            InputStream is = fs.open(path);
+            StringWriter writer = new StringWriter();
+            try {
+                IOUtils.copy(is, writer);
+                return writer.toString();
+            } finally {
+                IOUtils.closeQuietly(is);
+                IOUtils.closeQuietly(writer);
+                fs.close();
+            }
+        } catch (IOException ioe) {
+            LOG.error("Error reading password file from HDFS : " + ioe);
+            throw new FalconException(ioe);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index ceefb17..66dba6f 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -35,6 +35,7 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -681,7 +682,7 @@ public final class EntityUtil {
     //Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml
     //Each entity update creates a new staging path
     //Base staging path is the base path for all staging dirs
-    public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
+    public static Path getBaseStagingPath(Cluster cluster, Entity entity) {
         return new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(),
                 "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
     }
@@ -723,7 +724,7 @@ public final class EntityUtil {
 
     //Creates new staging path for entity schedule/update
     //Staging path containd md5 of the cluster view of the entity. This is required to check if update is required
-    public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
+    public static Path getNewStagingPath(Cluster cluster, Entity entity)
         throws FalconException {
         Entity clusterView = getClusterView(entity, cluster.getName());
         return new Path(getBaseStagingPath(cluster, entity),
@@ -778,7 +779,7 @@ public final class EntityUtil {
         }
     }
 
-    public static Path getLogPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
+    public static Path getLogPath(Cluster cluster, Entity entity) {
         return new Path(getBaseStagingPath(cluster, entity), "logs");
     }
 
@@ -1001,6 +1002,20 @@ public final class EntityUtil {
         return result;
     }
 
+    /**
+     * Returns Data Source Type given a feed with Import policy.
+     *
+     * @param cluster
+     * @param feed
+     * @return
+     * @throws FalconException
+     */
+
+    public static DatasourceType getImportDatasourceType(
+            Cluster cluster, Feed feed) throws FalconException {
+        return FeedHelper.getImportDatasourceType(cluster, feed);
+    }
+
     public static EntityNotification getEntityNotification(Entity entity) {
         switch (entity.getEntityType()) {
         case FEED:

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 5c252a8..2c65eba 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -26,14 +26,18 @@ import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
+import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
 import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Input;
@@ -301,7 +305,7 @@ public final class FeedHelper {
         clusterVars.put("colo", cluster.getColo());
         clusterVars.put("name", cluster.getName());
         if (cluster.getProperties() != null) {
-            for (Property property : cluster.getProperties().getProperties()) {
+            for (org.apache.falcon.entity.v0.cluster.Property property : cluster.getProperties().getProperties()) {
                 clusterVars.put(property.getName(), property.getValue());
             }
         }
@@ -786,6 +790,184 @@ public final class FeedHelper {
         return result;
     }
 
+
+    /**
+     * Returns the data source type associated with the Feed's import policy.
+     *
+     * @param clusterEntity
+     * @param feed
+     * @return {@link org.apache.falcon.entity.v0.datasource.DatasourceType}
+     * @throws FalconException
+     */
+    public static DatasourceType getImportDatasourceType(
+            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+            Feed feed) throws FalconException {
+        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
+        return DatasourceHelper.getImportSourceType(feedCluster);
+    }
+
+    /**
+     * Return if Import policy is enabled in the Feed definition.
+     *
+     * @param feedCluster
+     * @return true if import policy is enabled else false
+     */
+
+    public static boolean isImportEnabled(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (feedCluster.getType() == ClusterType.SOURCE) {
+            return (feedCluster.getImport() != null);
+        }
+        return false;
+    }
+
+    /**
+     * Returns the data source name associated with the Feed's import policy.
+     *
+     * @param feedCluster
+     * @return DataSource name defined in the Datasource Entity
+     */
+    public static String getImportDatasourceName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getName();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns Datasource table name.
+     *
+     * @param feedCluster
+     * @return Table or Topic name of the Datasource
+     */
+
+    public static String getImportDataSourceTableName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getTableName();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the extract method type.
+     *
+     * @param feedCluster
+     * @return {@link org.apache.falcon.entity.v0.feed.ExtractMethod}
+     */
+
+    public static ExtractMethod getImportExtractMethod(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getExtract().getType();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the merge type of the Feed import policy.
+     *
+     * @param feedCluster
+     * @return {@link org.apache.falcon.entity.v0.feed.MergeType}
+     */
+    public static MergeType getImportMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getExtract().getMergepolicy();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the initial instance date for the import data set or coorinator.
+     *
+     * For snapshot merge type, a latest time will be used since the source data is dumped in whole.
+     * For incremental merge type, start date specified in the cluster validity will be used.
+     *
+     * @param feedCluster
+     * @return Feed cluster validity start date or recent time
+     */
+    public static Date getImportInitalInstance(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        Date initialInstance = new Date();
+        if (!FeedHelper.isSnapshotMergeType(feedCluster)) {
+            initialInstance = feedCluster.getValidity().getStart();
+        }
+        return initialInstance;
+    }
+
+    /**
+     * Helper method to check if the merge type is snapshot.
+     *
+     * @param feedCluster
+     * @return true if the feed import policy merge type is snapshot
+     *
+     */
+    public static boolean isSnapshotMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        return MergeType.SNAPSHOT == getImportMergeType(feedCluster);
+    }
+
+    /**
+     * Returns extra arguments specified in the Feed import policy.
+     *
+     * @param feedCluster
+     * @return
+     * @throws FalconException
+     */
+    public static Map<String, String> getImportArguments(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
+        throws FalconException {
+
+        Map<String, String> argsMap = new HashMap<String, String>();
+        if (feedCluster.getImport().getArguments() == null) {
+            return argsMap;
+        }
+
+        for(org.apache.falcon.entity.v0.feed.Argument p : feedCluster.getImport().getArguments().getArguments()) {
+            argsMap.put(p.getName().toLowerCase(), p.getValue());
+        }
+        return argsMap;
+    }
+
+    /**
+     * Returns Fields list specified in the Import Policy.
+     *
+     * @param feedCluster
+     * @return List of String
+     * @throws FalconException
+     */
+    public static List<String> getFieldList(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
+        throws FalconException {
+        if (feedCluster.getImport().getSource().getFields() == null) {
+            return null;
+        }
+        org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields();
+        FieldIncludeExclude includeFileds = fieldType.getIncludes();
+        if (includeFileds == null) {
+            return null;
+        }
+        return includeFileds.getFields();
+    }
+
+
+    /**
+     * Returns true if exclude field lists are used. This is a TBD feature.
+     *
+     * @param feedCluster
+     * @return true of exclude field list is used or false.
+     * @throws FalconException
+     */
+
+    public static boolean isFieldExcludes(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
+        throws FalconException {
+        if (feedCluster.getImport().getSource().getFields() != null) {
+            org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields();
+            FieldIncludeExclude excludeFileds = fieldType.getExcludes();
+            if ((excludeFileds != null) && (excludeFileds.getFields().size() > 0)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName,
                                                                               Date instanceTime)
         throws FalconException {
@@ -813,5 +995,4 @@ public final class FeedHelper {
         }
         return  retentionFrequency;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
new file mode 100644
index 0000000..e58b1e9
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity.parser;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.DatasourceHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.datasource.ACL;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.datasource.Interfacetype;
+import org.apache.falcon.util.HdfsClassLoader;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Parser for DataSource entity definition.
+ */
+
+public class DatasourceEntityParser extends EntityParser<Datasource> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DatasourceEntityParser.class);
+
+    public DatasourceEntityParser() {
+        super(EntityType.DATASOURCE);
+    }
+
+    @Override
+    public void validate(Datasource db) throws FalconException {
+        ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars());
+            Thread.currentThread().setContextClassLoader(hdfsClassLoader);
+            validateInterface(db, Interfacetype.READONLY, hdfsClassLoader);
+            validateInterface(db, Interfacetype.WRITE, hdfsClassLoader);
+            validateACL(db);
+        } catch(IOException io) {
+            throw new ValidationException("Unable to copy driver jars to local dir: "
+                    + Arrays.toString(db.getDriver().getJars().toArray()));
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousClassLoader);
+        }
+    }
+
+    private static void validateInterface(Datasource db, Interfacetype interfacetype, ClassLoader hdfsClassLoader)
+        throws ValidationException {
+        String endpoint = null;
+        try {
+            endpoint = DatasourceHelper.getReadOnlyEndpoint(db);
+            if (StringUtils.isNotBlank(endpoint)) {
+                LOG.info("Validating {0} endpoint {1} connection.", interfacetype.value(), endpoint);
+                Properties userPasswdInfo = DatasourceHelper.fetchReadPasswordInfo(db);
+                validateConnection(hdfsClassLoader, db.getDriver().getClazz(), endpoint, userPasswdInfo);
+            }
+        } catch(FalconException fe) {
+            throw new ValidationException(String.format("Cannot validate '%s' "
+                            + "interface '%s' " + "of database entity '%s' due to '%s' ",
+                   interfacetype, endpoint,
+                   db.getName(), fe.getMessage()));
+        }
+    }
+
+    private static void validateConnection(ClassLoader hdfsClassLoader, String driverClass,
+                                    String connectUrl, Properties userPasswdInfo)
+        throws FalconException {
+        try {
+            java.sql.Driver driver = (java.sql.Driver) hdfsClassLoader.loadClass(driverClass).newInstance();
+            LOG.info("Validating connection URL: {0} using driver: {1}", connectUrl, driver.getClass().toString());
+            Connection con = driver.connect(connectUrl, userPasswdInfo);
+            if (con == null) {
+                throw new FalconException("DriverManager.getConnection() return "
+                       + "null for URL : " + connectUrl);
+            }
+        } catch (Exception ex) {
+            LOG.error("Exception while validating connection : ", ex);
+            throw new FalconException(ex);
+        }
+    }
+
+    /**
+     * Validate ACL if authorization is enabled.
+     *
+     * @param  db database entity
+     * @throws ValidationException
+     */
+    private void validateACL(Datasource db) throws ValidationException {
+        if (isAuthorizationDisabled) {
+            return;
+        }
+
+        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
+        final ACL dbACL = db.getACL();
+        if (dbACL == null) {
+            throw new ValidationException("Datasource ACL cannot be empty for:  " + db.getName());
+        }
+
+        validateACLOwnerAndGroup(dbACL);
+
+        try {
+            authorize(db.getName(), dbACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
index 5a33201..b497770 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
@@ -45,6 +45,8 @@ public final class EntityParserFactory {
             return new FeedEntityParser();
         case CLUSTER:
             return new ClusterEntityParser();
+        case DATASOURCE:
+            return new DatasourceEntityParser();
         default:
             throw new IllegalArgumentException("Unhandled entity type: " + entityType);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index c5cfdd2..c70f18d 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -33,11 +33,14 @@ import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.ACL;
+import org.apache.falcon.entity.v0.feed.Extract;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
 import org.apache.falcon.entity.v0.feed.Properties;
 import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.Sla;
@@ -55,8 +58,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.TimeZone;
 
@@ -95,6 +99,12 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     cluster.getName());
             validateClusterHasRegistry(feed, cluster);
             validateFeedCutOffPeriod(feed, cluster);
+            if (FeedHelper.isImportEnabled(cluster)) {
+                validateEntityExists(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
+                validateFeedExtractionType(feed, cluster);
+                validateFeedImportArgs(cluster);
+                validateFeedImportFieldExcludes(cluster);
+            }
         }
 
         validateFeedStorage(feed);
@@ -553,4 +563,54 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
         }
     }
+
+    /**
+     * Validate extraction and merge type combination. Currently supported combo:
+     *
+     * ExtractionType = FULL and MergeType = SNAPSHOT.
+     * ExtractionType = INCREMENTAL and MergeType = APPEND.
+     *
+     * @param feed Feed entity
+     * @param cluster Cluster referenced in the Feed definition
+     * @throws FalconException
+     */
+
+    private void validateFeedExtractionType(Feed feed, Cluster cluster) throws FalconException {
+        Extract extract = cluster.getImport().getSource().getExtract();
+
+        if (ExtractMethod.FULL == extract.getType())  {
+            if ((MergeType.SNAPSHOT != extract.getMergepolicy())
+                    || (extract.getDeltacolumn() != null)) {
+                throw new ValidationException(String.format("Feed %s is using FULL "
+                        + "extract method but specifies either a superfluous "
+                        + "deltacolumn  or a mergepolicy other than snapshot", feed.getName()));
+            }
+        }  else {
+            throw new ValidationException(String.format("Feed %s is using unsupported "
+                    + "extraction mechanism %s", feed.getName(), extract.getType().value()));
+        }
+    }
+
+    /**
+     * Validate improt arguments.
+     * @param feedCluster Cluster referenced in the feed
+     */
+    private void validateFeedImportArgs(Cluster feedCluster) throws FalconException {
+        Map<String, String> args = FeedHelper.getImportArguments(feedCluster);
+        int numMappers = 1;
+        if (args.containsKey("--num-mappers")) {
+            numMappers = Integer.parseInt(args.get("--num-mappers"));
+        }
+        if ((numMappers > 1) && (!args.containsKey("--split-by"))) {
+            throw new ValidationException(String.format("Feed import expects "
+                    + "--split-by column when --num-mappers > 1"));
+        }
+    }
+
+    private void validateFeedImportFieldExcludes(Cluster feedCluster) throws FalconException {
+        if (FeedHelper.isFieldExcludes(feedCluster)) {
+            throw new ValidationException(String.format("Field excludes are not supported "
+                    + "currently in Feed import policy"));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 4dd1c68..9c7a932 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -59,7 +59,7 @@ import java.util.concurrent.TimeUnit;
 public final class ConfigurationStore implements FalconService {
 
     private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] {
-        EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, };
+        EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE, };
     public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED,
         EntityType.CLUSTER, };
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
index bd4c6cf..e4d9385 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.entity.v0;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
@@ -189,6 +190,16 @@ public final class EntityGraph implements ConfigurationChangeListener {
             Set<Node> clusterEdges = nodeEdges.get(clusterNode);
             feedEdges.add(clusterNode);
             clusterEdges.add(feedNode);
+
+            if (FeedHelper.isImportEnabled(cluster)) {
+                Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
+                if (!nodeEdges.containsKey(dbNode)) {
+                    nodeEdges.put(dbNode, new HashSet<Node>());
+                }
+                Set<Node> dbEdges = nodeEdges.get(dbNode);
+                feedEdges.add(dbNode);
+                dbEdges.add(feedNode);
+            }
         }
         return nodeEdges;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
index bd32852..4c7e913 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
@@ -46,6 +46,9 @@ public final class EntityIntegrityChecker {
         case FEED:
             return filter(deps, EntityType.PROCESS);
 
+        case DATASOURCE:
+            return filter(deps, EntityType.FEED);
+
         default:
             return null;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 8c3876c..25bbf0c 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -20,10 +20,12 @@ package org.apache.falcon.metadata;
 
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
@@ -64,6 +66,10 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         case FEED:
             addFeedEntity((Feed) entity);
             break;
+        case DATASOURCE:
+            addDatasourceEntity((Datasource) entity);
+            break;
+
         default:
             throw new IllegalArgumentException("Invalid EntityType " + entityType);
         }
@@ -91,8 +97,25 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
             }
         }
+
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
+            if (FeedHelper.isImportEnabled(feedCluster)) {
+                addRelationToDatasource(feedVertex, FeedHelper.getImportDatasourceName(feedCluster),
+                        RelationshipLabel.DATASOURCE_IMPORT_EDGE);
+            }
+        }
+    }
+
+    public void addDatasourceEntity(Datasource dsEntity) {
+        LOG.info("Adding datasource entity: {}", dsEntity.getName());
+        Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY);
+
+        addUserRelation(dsVertex);
+        addColoRelation(dsEntity.getColo(), dsVertex);
+        addDataClassification(dsEntity.getTags(), dsVertex);
     }
 
+
     public void updateEntity(Entity oldEntity, Entity newEntity) {
         EntityType entityType = oldEntity.getEntityType();
         switch (entityType) {
@@ -177,6 +200,16 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         addEdge(fromVertex, clusterVertex, edgeLabel.getName());
     }
 
+    public void addRelationToDatasource(Vertex fromVertex, String datasourceName, RelationshipLabel edgeLabel) {
+        Vertex clusterVertex = findVertex(datasourceName, RelationshipType.DATASOURCE_ENTITY);
+        if (clusterVertex == null) { // cluster must exist before adding other entities
+            LOG.error("Illegal State: Datasource entity vertex must exist for {}", datasourceName);
+            throw new IllegalStateException("Datasource entity vertex must exist: " + datasourceName);
+        }
+
+        addEdge(fromVertex, clusterVertex, edgeLabel.getName());
+    }
+
     public void addInputFeeds(Inputs inputs, Vertex processVertex) {
         if (inputs == null) {
             return;

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index f485764..b709857 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -267,6 +267,39 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
+
+    public void addImportedInstance(WorkflowExecutionContext context) throws FalconException {
+
+        String feedName = context.getOutputFeedNames();
+        String feedInstanceDataPath = context.getOutputFeedInstancePaths();
+        String datasourceName = context.getDatasourceName();
+        String sourceClusterName = context.getSrcClusterName();
+
+        LOG.info("Computing import feed instance for : name= {} path= {}, in cluster: {} "
+                       +  "from datasource: {}", feedName,
+                feedInstanceDataPath, sourceClusterName, datasourceName);
+        String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName,
+                feedInstanceDataPath, context.getNominalTimeAsISO8601());
+        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+        LOG.info("Vertex exists? name={}, type={}, v={}",
+                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+            LOG.info("{} instance vertex {} does not exist, add it",
+                    RelationshipType.FEED_INSTANCE, feedInstanceName);
+            feedInstanceVertex = addFeedInstance(// add a new instance
+                    feedInstanceName, context, feedName, context.getSrcClusterName());
+        }
+        addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY,
+                RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601());
+        addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY,
+                RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601());
+    }
+
+    public String getImportInstanceName(WorkflowExecutionContext context) {
+        return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
+    }
+
     private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
                                  WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 56fbde0..cf2b651 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -287,8 +287,11 @@ public class MetadataMappingService
         case DELETE:
             onFeedInstanceEvicted(context);
             break;
+        case IMPORT:
+            onFeedInstanceImported(context);
+            break;
         default:
-            throw new IllegalArgumentException("Invalid EntityOperation" + entityOperation);
+            throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation);
         }
     }
 
@@ -328,4 +331,8 @@ public class MetadataMappingService
         LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
         instanceGraphBuilder.addEvictedInstance(context);
     }
+    private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException {
+        LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601());
+        instanceGraphBuilder.addImportedInstance(context);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index 5b312da..6d4bf46 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -28,6 +28,7 @@ public enum RelationshipLabel {
     PROCESS_CLUSTER_EDGE("runs-on"),
     FEED_PROCESS_EDGE("input"),
     PROCESS_FEED_EDGE("output"),
+    DATASOURCE_IMPORT_EDGE("import"),
 
     // instance edge labels
     INSTANCE_ENTITY_EDGE("instance-of"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java
new file mode 100644
index 0000000..3f9091f
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Helper class loader that fetches jars from HDFS location and loads into JVM.
+ */
+
+public class HdfsClassLoader extends URLClassLoader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsClassLoader.class);
+    private static Map<String, HdfsClassLoader>  classLoaderCache = new ConcurrentHashMap<String, HdfsClassLoader>();
+    private static final Object LOCK = new Object();
+
+    public static ClassLoader load(final String name, final List<String> jarHdfsPath) throws IOException {
+        LOG.info("ClassLoader cache size = " + classLoaderCache.size());
+        if (classLoaderCache.containsKey(name)) {
+            return classLoaderCache.get(name);
+        }
+
+        synchronized (LOCK) {
+            LOG.info("Copying jar files from HDFS to local dir");
+            final URL[] urls = copyHdfsJarFilesToTempDir(name, jarHdfsPath);
+            final ClassLoader parentClassLoader = HdfsClassLoader.class.getClassLoader();
+            LOG.info("Creating a new HdfsClassLoader for name = {0} with parent = {1} using classpath = {2}",
+                    name, parentClassLoader.toString(),  Arrays.toString(jarHdfsPath.toArray()));
+            HdfsClassLoader hdfsClassLoader = java.security.AccessController.doPrivileged(
+                    new java.security.PrivilegedAction<HdfsClassLoader>() {
+                        @Override
+                        public HdfsClassLoader run() {
+                            return new HdfsClassLoader(name, urls, parentClassLoader);
+                        }
+                    }
+            );
+            classLoaderCache.put(name, hdfsClassLoader);
+            return hdfsClassLoader;
+        }
+    }
+
+    private final ClassLoader realParent;
+
+    public HdfsClassLoader(String name, URL[] urls, ClassLoader parentClassLoader) {
+        // set the 'parent' member to null giving an option for this class loader
+        super(urls, null);
+        this.realParent = parentClassLoader;
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve)
+        throws ClassNotFoundException {
+
+        // Load through the parent class loader first and then fallback to this class loader.
+        try {
+            return realParent.loadClass(name);
+        } catch (Throwable t) {
+            return super.loadClass(name, resolve);
+        }
+    }
+
+    @Override
+    public URL getResource(String name) {
+        // This is the same as the jdk's getResource except the parent
+        // is taken from the realParent member instead of the parent member.
+        URL url = realParent.getResource(name);
+        if (url == null) {
+            url = findResource(name);
+        }
+        return url;
+    }
+
+    private static URL[] copyHdfsJarFilesToTempDir(String databaseName, List<String> jars) throws IOException {
+        List<URL> urls = new ArrayList<URL>();
+
+        final Configuration conf = new Configuration();
+        Path localPath = createTempDir(databaseName, conf);
+
+        for (String jar : jars) {
+            Path jarPath = new Path(jar);
+            final FileSystem fs = jarPath.getFileSystem(conf);
+            if (fs.isFile(jarPath) && jarPath.getName().endsWith(".jar")) {
+                LOG.info("Copying jarFile = " + jarPath);
+                fs.copyToLocalFile(jarPath, localPath);
+            }
+        }
+        urls.addAll(getJarsInPath(localPath.toUri().toURL()));
+
+        return urls.toArray(new URL[urls.size()]);
+    }
+
+    private static Path createTempDir(String databaseName, Configuration conf) throws IOException {
+        String tmpBaseDir = String.format("file://%s", System.getProperty("java.io.tmpdir"));
+        if (StringUtils.isBlank(tmpBaseDir)) {
+            tmpBaseDir = "file:///tmp";
+        }
+        Path localPath = new Path(tmpBaseDir, databaseName);
+        localPath.getFileSystem(conf).mkdirs(localPath);
+        return localPath;
+    }
+
+    private static List<URL> getJarsInPath(URL fileURL) throws MalformedURLException {
+        List<URL> urls = new ArrayList<URL>();
+
+        File file = new File(fileURL.getPath());
+        if (file.isDirectory()) {
+            File[] jarFiles = file.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File file) {
+                    return file.isFile() && file.getName().endsWith(".jar");
+                }
+            });
+
+            for (File jarFile : jarFiles) {
+                urls.add(jarFile.toURI().toURL());
+            }
+
+            if (!fileURL.toString().endsWith("/")) {
+                fileURL = new URL(fileURL.toString() + "/");
+            }
+        }
+
+        urls.add(fileURL);
+        return urls;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index ac7140c..915e8c2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -38,6 +38,7 @@ public enum WorkflowExecutionArgs {
     // Exactly same as the above. Introduced to ensure compatibility between messages produced by POST-PROCESSING and
     // the values in conf.
     DATA_OPERATION("falconDataOperation", "operation like generate, delete, replicate", false),
+    DATASOURCE_NAME("datasource", "name of the datasource", false),
 
     // who
     WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9bfc51b..899165b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 
+
 /**
  * Captures the workflow execution context.
  */
@@ -74,7 +75,7 @@ public class WorkflowExecutionContext {
      * Entity operations supported.
      */
     public enum EntityOperations {
-        GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
+        GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD, IMPORT
     }
 
     public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = {
@@ -299,9 +300,12 @@ public class WorkflowExecutionContext {
     }
 
     public long getExecutionCompletionTime() {
+
         return creationTime;
     }
 
+    public String getDatasourceName() { return getValue(WorkflowExecutionArgs.DATASOURCE_NAME); }
+
     public long getWorkflowStartTime() {
         return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME));
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index aab9cee..a6d607b 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -58,6 +58,7 @@ public class AbstractTestBase {
     protected static final String FEED3_XML = "/config/feed/feed-0.3.xml";
     protected static final String FEED4_XML = "/config/feed/feed-0.4.xml";
     protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
+    protected static final String DATASOURCE_XML = "/config/datasource/datasource-0.1.xml";
     protected EmbeddedCluster dfsCluster;
     protected Configuration conf = new Configuration();
     private ConfigurationStore store;

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java b/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
index 640e87d..5a4d6ec 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
@@ -38,6 +38,7 @@ public class EntityTypeTest {
         Assert.assertTrue(EntityType.PROCESS.isSchedulable());
         Assert.assertTrue(EntityType.FEED.isSchedulable());
         Assert.assertFalse(EntityType.CLUSTER.isSchedulable());
+        Assert.assertFalse(EntityType.DATASOURCE.isSchedulable());
     }
 
     @Test
@@ -48,6 +49,8 @@ public class EntityTypeTest {
         Assert.assertEquals(EntityType.CLUSTER, EntityType.getEnum("cluSTER"));
         Assert.assertEquals(EntityType.PROCESS, EntityType.getEnum("process"));
         Assert.assertEquals(EntityType.PROCESS, EntityType.getEnum("pRocess"));
+        Assert.assertEquals(EntityType.DATASOURCE, EntityType.getEnum("datasource"));
+        Assert.assertEquals(EntityType.DATASOURCE, EntityType.getEnum("dataSource"));
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)


Mime
View raw message