falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-1949 Update Data source Entity
Date Tue, 07 Jun 2016 00:58:40 GMT
Repository: falcon
Updated Branches:
  refs/heads/master e6526d829 -> 2b84dd56d


FALCON-1949 Update Data source Entity

+ Updates data source entity in the config store
+ Rewrites the corresponding IMPORT and EXPORT bundles with end and start dates for old and new bundles.
+ UT and IT
+ Manual end-to-end testing.

Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>

Reviewers: "Balu Vellanki <balu@apache.org>, Venkat Ranganathan <venkat@hortonworks.com>, Ying Zheng <yzheng@hortonworks.com>"

Closes #157 from vramachan/FALCON-1949.UpdateDataSource


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2b84dd56
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2b84dd56
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2b84dd56

Branch: refs/heads/master
Commit: 2b84dd56d291299f22ba872c1b0814cdad5692fe
Parents: e6526d8
Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>
Authored: Mon Jun 6 17:58:44 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Mon Jun 6 17:58:44 2016 -0700

----------------------------------------------------------------------
 client/src/main/resources/feed-0.1.xsd          |   1 +
 .../apache/falcon/entity/DatasourceHelper.java  | 185 ++++++++++++++++++-
 .../org/apache/falcon/entity/FeedHelper.java    |   2 -
 .../entity/parser/DatasourceEntityParser.java   |   4 +
 .../falcon/entity/store/ConfigurationStore.java |   5 +
 .../apache/falcon/entity/v0/EntityGraph.java    |   1 -
 .../EntityRelationshipGraphBuilder.java         |  34 +++-
 .../falcon/metadata/RelationshipLabel.java      |   1 +
 .../org/apache/falcon/update/UpdateHelper.java  |  34 ++++
 .../apache/falcon/entity/AbstractTestBase.java  |  20 ++
 .../entity/store/ConfigurationStoreTest.java    |  51 ++++-
 .../apache/falcon/update/UpdateHelperTest.java  |  43 +++++
 .../falcon/resource/AbstractEntityManager.java  | 122 ++++++++++--
 .../falcon/lifecycle/DatasourceUpdateIT.java    |  95 ++++++++++
 .../org/apache/falcon/resource/TestContext.java |   1 +
 .../src/test/resources/datasource-template5.xml |  46 +++++
 16 files changed, 615 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 3488233..cbc97b9 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -489,6 +489,7 @@
         </xs:sequence>
         <xs:attribute type="non-empty-string" name="name" use="required"/>
         <xs:attribute type="non-empty-string" name="tableName" use="required"/>
+        <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
     </xs:complexType>
     <xs:complexType name="extract">
         <xs:sequence>

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
index 51ce898..e035bb5 100644
--- a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
@@ -27,10 +27,12 @@ import org.apache.falcon.entity.v0.datasource.Credential;
 import org.apache.falcon.entity.v0.datasource.Credentialtype;
 import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.datasource.DatasourceType;
+import org.apache.falcon.entity.v0.datasource.Driver;
 import org.apache.falcon.entity.v0.datasource.Interface;
 import org.apache.falcon.entity.v0.datasource.Interfaces;
 import org.apache.falcon.entity.v0.datasource.Interfacetype;
 import org.apache.falcon.entity.v0.datasource.PasswordAliasType;
+import org.apache.falcon.entity.v0.datasource.Property;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -46,6 +48,8 @@ import java.io.InputStream;
 import java.io.StringWriter;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * DataSource entity helper methods.
@@ -70,11 +74,11 @@ public final class DatasourceHelper {
     }
 
     public static String getReadOnlyEndpoint(Datasource datasource) {
-        return getInterface(datasource, Interfacetype.READONLY);
+        return getInterfaceEndpoint(datasource, Interfacetype.READONLY);
     }
 
     public static String getWriteEndpoint(Datasource datasource) {
-        return getInterface(datasource, Interfacetype.WRITE);
+        return getInterfaceEndpoint(datasource, Interfacetype.WRITE);
     }
 
     /**
@@ -148,16 +152,158 @@ public final class DatasourceHelper {
     }
 
     /**
+     * checks if two datasource interfaces are same.
+     *
+     * @param oldEntity old datasource entity
+     * @param newEntity new datasource entity
+     * @param ifacetype type of interface
+     * @return true if same else false
+     */
+    public static boolean isSameInterface(Datasource oldEntity, Datasource newEntity, Interfacetype ifacetype) {
+        LOG.debug("Verifying if Interfaces match for Datasource {} : Old - {}, New - {}", oldEntity, newEntity);
+        Interface oIface = getInterface(oldEntity, ifacetype);
+        Interface nIface = getInterface(newEntity, ifacetype);
+        if ((oIface == null) && (nIface == null)) {
+            return true;
+        }
+        if ((oIface == null) || (nIface == null)) {
+            return false;
+        }
+
+        return (StringUtils.equals(oIface.getEndpoint(), nIface.getEndpoint())
+            && isSameDriverClazz(oIface.getDriver(), nIface.getDriver())
+            && isSameCredentials(oIface.getCredential(), nIface.getCredential()));
+
+    }
+
+    /**
+     * check if datasource driver is same.
+     * @param oldEntity
+     * @param newEntity
+     * @return true if same or false
+     */
+
+    public static boolean isSameDriverClazz(Driver oldEntity, Driver newEntity) {
+        if ((oldEntity == null) && (newEntity == null)) {
+            return true;
+        }
+        if ((oldEntity == null) || (newEntity == null)) {
+            return false;
+        }
+        return StringUtils.equals(oldEntity.getClazz(), newEntity.getClazz());
+    }
+
+    /**
+     * checks if data source properties are same.
+     * @param oldEntity
+     * @param newEntity
+     * @return true if same else false
+     */
+
+    public static boolean isSameProperties(Datasource oldEntity, Datasource newEntity) {
+        Map<String, String> oldProps = getDatasourceProperties(oldEntity);
+        Map<String, String> newProps = getDatasourceProperties(newEntity);
+        return oldProps.equals(newProps);
+    }
+
+    /**
+     * checks if data source credentials are same.
+     * @param oCred
+     * @param nCred
+     * @return true true
+     */
+    public static boolean isSameCredentials(Credential oCred, Credential nCred) {
+        if ((oCred == null) && (nCred == null)) {
+            return true;
+        }
+        if ((oCred == null) || (nCred == null)) {
+            return true;
+        }
+        if (StringUtils.equals(oCred.getUserName(), nCred.getUserName())) {
+            if (oCred.getType() == nCred.getType()) {
+                if (oCred.getType() == Credentialtype.PASSWORD_TEXT) {
+                    return StringUtils.equals(oCred.getPasswordText(), nCred.getPasswordText());
+                } else if (oCred.getType() == Credentialtype.PASSWORD_FILE) {
+                    return StringUtils.equals(oCred.getPasswordFile(), nCred.getPasswordFile());
+                } else if (oCred.getType() == Credentialtype.PASSWORD_ALIAS) {
+                    return (StringUtils.equals(oCred.getPasswordAlias().getAlias(),
+                            nCred.getPasswordAlias().getAlias())
+                            && StringUtils.equals(oCred.getPasswordAlias().getProviderPath(),
+                                    nCred.getPasswordAlias().getProviderPath()));
+                }
+            } else {
+                return false;
+            }
+        }
+        return false;
+    }
+
+    public static Credential getCredential(Datasource db) {
+        return getCredential(db, null);
+    }
+
+    public static Credential getCredential(Datasource db, Interfacetype interfaceType) {
+        if (interfaceType == null) {
+            return db.getInterfaces().getCredential();
+        } else {
+            for(Interface iface : db.getInterfaces().getInterfaces()) {
+                if (iface.getType() == interfaceType) {
+                    return iface.getCredential();
+                }
+            }
+        }
+        return null;
+    }
+
+    public static void validateCredential(Credential cred) throws FalconException {
+        if (cred == null) {
+            return;
+        }
+        switch (cred.getType()) {
+        case PASSWORD_TEXT:
+            if (StringUtils.isBlank(cred.getUserName()) || StringUtils.isBlank(cred.getPasswordText())) {
+                throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s'",
+                    cred.getType().value(), "userName", "passwordText"));
+            }
+            break;
+        case PASSWORD_FILE:
+            if (StringUtils.isBlank(cred.getUserName()) || StringUtils.isBlank(cred.getPasswordFile())) {
+                throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s'",
+                    cred.getType().value(), "userName", "passwordFile"));
+            }
+            break;
+        case PASSWORD_ALIAS:
+            if (StringUtils.isBlank(cred.getUserName()) || (cred.getPasswordAlias() == null)
+                || StringUtils.isBlank(cred.getPasswordAlias().getAlias())
+                || StringUtils.isBlank(cred.getPasswordAlias().getProviderPath())) {
+                throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s' or %s'",
+                    cred.getType().value(), "userName", "alias", "providerPath"));
+            }
+            break;
+        default:
+            throw new FalconException(String.format("Unknown Credential type '%s'", cred.getType().value()));
+        }
+    }
+
+    /**
      * Return the Interface endpoint for the interface type specified in the argument.
      *
      * @param db
      * @param type - can be read-only or write
      * @return
      */
-    private static String getInterface(Datasource db, Interfacetype type) {
+    private static String getInterfaceEndpoint(Datasource db, Interfacetype type) {
+        if (getInterface(db, type) != null) {
+            return getInterface(db, type).getEndpoint();
+        } else {
+            return null;
+        }
+    }
+
+    private static Interface getInterface(Datasource db, Interfacetype type) {
         for(Interface ifs : db.getInterfaces().getInterfaces()) {
             if (ifs.getType() == type) {
-                return ifs.getEndpoint();
+                return ifs;
             }
         }
         return null;
@@ -172,6 +318,12 @@ public final class DatasourceHelper {
         }
     }
 
+    /**
+     * fetch password from the corresponding store.
+     * @param c
+     * @return actual password
+     * @throws FalconException
+     */
     private static String fetchPasswordInfoFromCredentialStore(final PasswordAliasType c) throws FalconException {
         try {
             final String credPath = c.getProviderPath();
@@ -216,6 +368,15 @@ public final class DatasourceHelper {
             throw new FalconException(msg, ioe);
         }
     }
+
+    /**
+     * fetch the password from file.
+     *
+     * @param passwordFilePath
+     * @return
+     * @throws FalconException
+     */
+
     private static String fetchPasswordInfoFromFile(String passwordFilePath) throws FalconException {
         try {
             Path path = new Path(passwordFilePath);
@@ -245,4 +406,20 @@ public final class DatasourceHelper {
             throw new FalconException(ioe);
         }
     }
+
+
+    /*
+     returns data store properties
+     */
+
+    private static Map<String, String> getDatasourceProperties(final Datasource datasource) {
+        Map<String, String> returnProps = new HashMap<String, String>();
+        if (datasource.getProperties() != null) {
+            for (Property prop : datasource.getProperties().getProperties()) {
+                returnProps.put(prop.getName(), prop.getValue());
+            }
+        }
+        return returnProps;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index cca2d8b..ee6837e 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -874,8 +874,6 @@ public final class FeedHelper {
         }
     }
 
-
-
     /**
      * Returns Datasource table name.
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
index 998f952..358fbce 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
@@ -51,6 +51,10 @@ public class DatasourceEntityParser extends EntityParser<Datasource> {
     public void validate(Datasource db) throws FalconException {
         try {
             ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars());
+            DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db));
+            DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db, Interfacetype.READONLY));
+            DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db, Interfacetype.WRITE));
+
             validateInterface(db, Interfacetype.READONLY, hdfsClassLoader);
             validateInterface(db, Interfacetype.WRITE, hdfsClassLoader);
             validateACL(db);

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 7f2b172..debf106 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.service.FalconService;
@@ -266,6 +267,10 @@ public final class ConfigurationStore implements FalconService {
             if (UpdateHelper.isClusterEntityUpdated((Cluster) oldentity, (Cluster) newEntity)) {
                 EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1);
             }
+        } else if (oldentity.getEntityType().equals(EntityType.DATASOURCE)) {
+            if (UpdateHelper.isDatasourceEntityUpdated((Datasource) oldentity, (Datasource) newEntity)) {
+                EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1);
+            }
         } else if (!EntityUtil.equals(oldentity, newEntity)) {
             // Increase version for other entities if they actually changed.
             EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity));

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
index acb570e..b5f6788 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
@@ -200,7 +200,6 @@ public final class EntityGraph implements ConfigurationChangeListener {
                 feedEdges.add(dbNode);
                 dbEdges.add(feedNode);
             }
-
             if (FeedHelper.isExportEnabled(cluster)) {
                 Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster));
                 if (!nodeEdges.containsKey(dbNode)) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index e6851df..af0a8b9 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -80,7 +80,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY);
 
         addUserRelation(clusterVertex);
-        addColoRelation(clusterEntity.getColo(), clusterVertex);
+        addColoRelation(clusterEntity.getColo(), clusterVertex, RelationshipLabel.CLUSTER_COLO);
         addDataClassification(clusterEntity.getTags(), clusterVertex);
     }
 
@@ -111,7 +111,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY);
 
         addUserRelation(dsVertex);
-        addColoRelation(dsEntity.getColo(), dsVertex);
+        addColoRelation(dsEntity.getColo(), dsVertex, RelationshipLabel.DATASOURCE_COLO);
         addDataClassification(dsEntity.getTags(), dsVertex);
     }
 
@@ -128,11 +128,27 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         case FEED:
             updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
             break;
+        case DATASOURCE:
+            updateDatasourceEntity((Datasource) oldEntity, (Datasource) newEntity);
+            break;
         default:
             throw new IllegalArgumentException("Invalid EntityType " + entityType);
         }
     }
 
+    private void updateDatasourceEntity(Datasource oldDatasource, Datasource newDatasouce) {
+        LOG.info("Updating Datasource entity: {}", newDatasouce.getName());
+
+        Vertex dsEntityVertex = findVertex(oldDatasource.getName(), RelationshipType.DATASOURCE_ENTITY);
+        if (dsEntityVertex == null) {
+            LOG.error("Illegal State: Datasource entity vertex must exist for {}", oldDatasource.getName());
+            throw new IllegalStateException(oldDatasource.getName() + " entity vertex must exist.");
+        }
+        updateColoEdge(oldDatasource.getColo(), newDatasouce.getColo(), dsEntityVertex,
+                RelationshipLabel.DATASOURCE_COLO);
+        updateDataClassification(oldDatasource.getTags(), newDatasouce.getTags(), dsEntityVertex);
+    }
+
     private void updateClusterEntity(Cluster oldCluster, Cluster newCluster) {
         LOG.info("Updating Cluster entity: {}", newCluster.getName());
         Vertex clusterEntityVertex = findVertex(oldCluster.getName(), RelationshipType.CLUSTER_ENTITY);
@@ -140,25 +156,27 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
             LOG.error("Illegal State: Cluster entity vertex must exist for {}", oldCluster.getName());
             throw new IllegalStateException(oldCluster.getName() + " entity vertex must exist.");
         }
-        updateColoEdge(oldCluster.getColo(), newCluster.getColo(), clusterEntityVertex);
+        updateColoEdge(oldCluster.getColo(), newCluster.getColo(), clusterEntityVertex,
+                RelationshipLabel.CLUSTER_COLO);
         updateDataClassification(oldCluster.getTags(), newCluster.getTags(), clusterEntityVertex);
     }
 
-    private void updateColoEdge(String oldColo, String newColo, Vertex clusterEntityVertex) {
+    private void updateColoEdge(String oldColo, String newColo, Vertex clusterEntityVertex,
+                                RelationshipLabel relLabel) {
         if (areSame(oldColo, newColo)) {
             return;
         }
 
         Vertex oldColoVertex = findVertex(oldColo, RelationshipType.COLO);
         if (oldColoVertex != null) {
-            removeEdge(clusterEntityVertex, oldColoVertex, RelationshipLabel.CLUSTER_COLO.getName());
+            removeEdge(clusterEntityVertex, oldColoVertex, relLabel.getName());
         }
         Vertex newColoVertex = findVertex(newColo, RelationshipType.COLO);
         if (newColoVertex == null) {
             newColoVertex = addVertex(newColo, RelationshipType.COLO);
         }
 
-        addEdge(clusterEntityVertex, newColoVertex, RelationshipLabel.CLUSTER_COLO.getName());
+        addEdge(clusterEntityVertex, newColoVertex, relLabel.getName());
     }
 
     public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
@@ -211,9 +229,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex);
     }
 
-    public void addColoRelation(String colo, Vertex fromVertex) {
+    public void addColoRelation(String colo, Vertex fromVertex, RelationshipLabel relLabel) {
         Vertex coloVertex = addVertex(colo, RelationshipType.COLO);
-        addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName());
+        addEdge(fromVertex, coloVertex, relLabel.getName());
     }
 
     public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index a146957..e8ea0e4 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -39,6 +39,7 @@ public enum RelationshipLabel {
     USER("owned-by"),
     GROUPS("grouped-as"),
     PIPELINES("pipeline"),
+    DATASOURCE_COLO("datasource-colo"),
 
     // replication labels
     FEED_CLUSTER_REPLICATED_EDGE("replicated-to"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index ae88a01..266319f 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -21,6 +21,7 @@ package org.apache.falcon.update;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.DatasourceHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
@@ -29,6 +30,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
@@ -166,4 +168,36 @@ public final class UpdateHelper {
 
         return false;
     }
+
+    public static boolean isDatasourceEntityUpdated(final Datasource oldEntity, final Datasource newEntity)
+        throws FalconException {
+        // ignore changes : colo, acl, description, tags
+        // can't change   : name, data source entity type
+
+        // major change that trigger bundle rewrite
+        // driver class name change but not driver jar as it is automatically picked up from share lib
+
+        if (!DatasourceHelper.isSameDriverClazz(oldEntity.getDriver(), newEntity.getDriver())) {
+            return true;
+        }
+
+        // interface endpoint, credential or driver update will trigger a bundle rewrite
+        for(org.apache.falcon.entity.v0.datasource.Interfacetype ifacetype
+                : org.apache.falcon.entity.v0.datasource.Interfacetype.values()) {
+            if (!DatasourceHelper.isSameInterface(oldEntity, newEntity, ifacetype)) {
+                return true;
+            }
+        }
+        // check default credential too
+        if (!DatasourceHelper.isSameCredentials(oldEntity.getInterfaces().getCredential(),
+                newEntity.getInterfaces().getCredential())) {
+            return true;
+        }
+
+        // any change in the properties will trigger a bundle rewrite
+        if (!DatasourceHelper.isSameProperties(oldEntity, newEntity)) {
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 3745955..3817056 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -136,6 +137,13 @@ public class AbstractTestBase {
 
             store.publish(type, process);
             break;
+        case DATASOURCE:
+            Datasource datasource = (Datasource) unmarshaller.unmarshal(this.getClass().getResource(DATASOURCE_XML));
+            datasource.setName(name);
+            datasource.setVersion(0);
+            decorateACL(proxyUser, defaultGroupName, datasource);
+            store.publish(type, datasource);
+            break;
         default:
         }
     }
@@ -158,6 +166,18 @@ public class AbstractTestBase {
         cluster.setACL(clusterACL);
     }
 
+    private void decorateACL(String proxyUser, String defaultGroupName, Datasource datasource) {
+        if (datasource.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.datasource.ACL dsACL =
+                new org.apache.falcon.entity.v0.datasource.ACL();
+        dsACL.setOwner(proxyUser);
+        dsACL.setGroup(defaultGroupName);
+        datasource.setACL(dsACL);
+    }
+
     private void decorateACL(String proxyUser, String defaultGroupName, Feed feed) {
         if (feed.getACL() != null) {
             return;

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
index 8056e80..0febca5 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
@@ -23,6 +23,13 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Credential;
+import org.apache.falcon.entity.v0.datasource.Credentialtype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.datasource.Interfaces;
+import org.apache.falcon.entity.v0.datasource.Interface;
+import org.apache.falcon.entity.v0.datasource.Interfacetype;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.util.StartupProperties;
@@ -112,7 +119,7 @@ public class ConfigurationStoreTest {
     }
 
     @Test
-    public void testUpdate() throws Exception {
+    public void testClusterUpdate() throws Exception {
         Cluster cluster1 = createClusterObj();
         store.publish(EntityType.CLUSTER, cluster1);
         Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0);
@@ -132,6 +139,28 @@ public class ConfigurationStoreTest {
         Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 1);
     }
 
+    @Test
+    public void testDatasourceUpdate() throws Exception {
+        Datasource ds1 = createDatasourceObj();
+        store.publish(EntityType.DATASOURCE, ds1);
+        Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.DATASOURCE, "mysql-db")).intValue(), 0);
+
+        Datasource ds3 = createDatasourceObj();
+        ds3.setDescription("changed the description");
+        store.initiateUpdate(ds3);
+        store.update(EntityType.DATASOURCE, ds3);
+        store.cleanupUpdateInit();
+        Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.DATASOURCE, "mysql-db")).intValue(), 0);
+
+        Datasource ds2 = createDatasourceObj();
+        ds2.getInterfaces().getInterfaces().get(0).setEndpoint("jdbc:mysql://host-2/test");
+        store.initiateUpdate(ds2);
+        store.update(EntityType.DATASOURCE, ds2);
+        store.cleanupUpdateInit();
+        Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.DATASOURCE, "mysql-db")).intValue(), 1);
+    }
+
+
     private Cluster createClusterObj() {
         Cluster cluster = new Cluster();
         cluster.setName("cluster1");
@@ -139,6 +168,26 @@ public class ConfigurationStoreTest {
         return cluster;
     }
 
+    private Datasource createDatasourceObj() {
+        Datasource datasource = new Datasource();
+        datasource.setName("mysql-db");
+        datasource.setColo("colo1");
+        datasource.setDescription("mysql database");
+        datasource.setType(DatasourceType.MYSQL);
+        Interface readInterface = new Interface();
+        readInterface.setType(Interfacetype.READONLY);
+        readInterface.setEndpoint("jdbc:mysql://host-1/test");
+        Credential cred = new Credential();
+        cred.setUserName("test");
+        cred.setType(Credentialtype.PASSWORD_TEXT);
+        cred.setPasswordText("password");
+        readInterface.setCredential(cred);
+        datasource.setInterfaces(new Interfaces());
+        datasource.getInterfaces().getInterfaces().add(readInterface);
+        datasource.setVersion(0);
+        return datasource;
+    }
+
     @Test
     public void testGet() throws Exception {
         Process p = store.get(EntityType.PROCESS, "notfound");

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index 52b7103..826686f 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -28,6 +28,8 @@ import org.apache.falcon.entity.parser.EntityParserFactory;
 import org.apache.falcon.entity.parser.FeedEntityParser;
 import org.apache.falcon.entity.parser.ProcessEntityParser;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.datasource.Credential;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -55,6 +57,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import java.io.IOException;
 import java.io.InputStream;
@@ -87,6 +90,8 @@ public class UpdateHelperTest extends AbstractTestBase {
         storeEntity(EntityType.FEED, "impressionFeed");
         storeEntity(EntityType.FEED, "imp-click-join1");
         storeEntity(EntityType.FEED, "imp-click-join2");
+        storeEntity(EntityType.DATASOURCE, "datasource1");
+        storeEntity(EntityType.DATASOURCE, "datasource2");
     }
 
     private void prepare(Process process) throws IOException, FalconException {
@@ -365,6 +370,44 @@ public class UpdateHelperTest extends AbstractTestBase {
         Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
     }
 
+    @Test
+    public void testIsDatasourceEntityUpdated() throws Exception {
+        Unmarshaller unmarshaller = EntityType.DATASOURCE.getUnmarshaller();
+
+        String datasource = "datasource1";
+        Datasource datasourceEntity = ConfigurationStore.get().get(EntityType.DATASOURCE, datasource);
+        Datasource newDatasourceEntity = getNewDatasource(unmarshaller, datasource);
+        Assert.assertNotNull(newDatasourceEntity);
+
+        // Tags, ACL, description, colo update should not update bundle/workflow for dependent entities
+        org.apache.falcon.entity.v0.datasource.ACL acl = new org.apache.falcon.entity.v0.datasource.ACL();
+        acl.setOwner("Test");
+        acl.setGroup("testGroup");
+        acl.setPermission("*");
+        newDatasourceEntity.setACL(acl);
+        newDatasourceEntity.setDescription("New Description");
+        newDatasourceEntity.setTags("test=val,test2=val2");
+        newDatasourceEntity.setColo("newColo2");
+        Assert.assertFalse(UpdateHelper.isDatasourceEntityUpdated(datasourceEntity, newDatasourceEntity));
+
+        // Changing read or write endpoint should trigger rewrite
+        newDatasourceEntity.getInterfaces().getInterfaces().get(0).setEndpoint("jdbc:hsqldb:localhost2/db1");
+        Assert.assertTrue(UpdateHelper.isDatasourceEntityUpdated(datasourceEntity, newDatasourceEntity));
+
+        // change credential type or value should trigger
+        newDatasourceEntity = getNewDatasource(unmarshaller, datasource);
+        Credential cred = newDatasourceEntity.getInterfaces().getInterfaces().get(0).getCredential();
+        cred.setPasswordText("blah");
+        Assert.assertTrue(UpdateHelper.isDatasourceEntityUpdated(datasourceEntity, newDatasourceEntity));
+    }
+
+    private Datasource getNewDatasource(Unmarshaller unmarshaller, String datasource) throws JAXBException {
+        Datasource newDatasourceEntity = (Datasource) unmarshaller.unmarshal(this.getClass()
+                .getResource(DATASOURCE_XML));
+        newDatasourceEntity.setName(datasource);
+        return newDatasourceEntity;
+    }
+
     private static Location getLocation(Feed feed, LocationType type, String cluster) {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster);
         if (feedCluster.getLocations() != null) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 5fa345d..6c9237b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -39,7 +39,9 @@ import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityIntegrityChecker;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.resource.APIResult.Status;
@@ -334,26 +336,41 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
 
             validateUpdate(oldEntity, newEntity);
             configStore.initiateUpdate(newEntity);
-
             obtainEntityLocks(oldEntity, "update", tokenList);
 
             StringBuilder result = new StringBuilder("Updated successfully");
-            //Update in workflow engine if entity is not a cluster (cluster entity is not scheduled)
-            if (!DeploymentUtil.isPrism() && !entityType.equals(EntityType.CLUSTER)) {
-                Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
-                Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
-                newClusters.retainAll(oldClusters); //common clusters for update
-                oldClusters.removeAll(newClusters); //deleted clusters
-
-                for (String cluster : newClusters) {
-                    result.append(getWorkflowEngine(oldEntity).update(oldEntity, newEntity, cluster, skipDryRun));
+            switch(entityType) {
+            case CLUSTER:
+                configStore.update(entityType, newEntity);
+                break;
+            case DATASOURCE:
+                configStore.update(entityType, newEntity);
+                // check always if dependant feeds are already upgraded and upgrade accordingly
+                if (entityType.equals(EntityType.DATASOURCE)) {
+                    ConfigurationStore.get().cleanupUpdateInit();
+                    releaseEntityLocks(entityName, tokenList);
+                    updateDatasourceDependents(entityName, skipDryRun);
                 }
-                for (String cluster : oldClusters) {
-                    getWorkflowEngine(oldEntity).delete(oldEntity, cluster);
+                break;
+            case FEED:
+            case PROCESS:
+                if (!DeploymentUtil.isPrism()) {
+                    Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
+                    Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
+                    newClusters.retainAll(oldClusters); //common clusters for update
+                    oldClusters.removeAll(newClusters); //deleted clusters
+                    for (String cluster : newClusters) {
+                        result.append(getWorkflowEngine(oldEntity).update(oldEntity, newEntity, cluster, skipDryRun));
+                    }
+                    for (String cluster : oldClusters) {
+                        getWorkflowEngine(oldEntity).delete(oldEntity, cluster);
+                    }
                 }
+                configStore.update(entityType, newEntity);
+                break;
+            default:
+                throw FalconWebException.newAPIException("Unknown entity type in update : " + entityType);
             }
-
-            configStore.update(entityType, newEntity);
             return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
         } catch (Throwable e) {
             LOG.error("Update failed", e);
@@ -365,6 +382,83 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
     }
 
     /**
+     * check if the data source entity dependent feeds are upgraded or not by checking against the data source entity
+     * version and upgrade feeds accordingly.
+     *
+     * @param datasourceName Name of the data source entity
+     * @param skipDryRun Skip dry run during update if set to true
+     * @return APIResult
+     *
+     */
+
+    public APIResult updateDatasourceDependents(String datasourceName, Boolean skipDryRun) {
+        try {
+            Datasource datasource = EntityUtil.getEntity(EntityType.DATASOURCE, datasourceName);
+            StringBuilder result = new StringBuilder(String.format("Updating feed entities "
+                    + "dependent on datasource : %s ", datasource.getName()));
+
+            // get data source dependent entities and check the version referenced is same
+            Pair<String, EntityType>[] dependentEntities = EntityIntegrityChecker.referencedBy(datasource);
+            if (dependentEntities == null) {
+                return new APIResult(APIResult.Status.SUCCEEDED, String.format("Datasource %s has "
+                        + "no dependent entities", datasourceName));
+            }
+            for (Pair<String, EntityType> depEntity : dependentEntities) {
+                Entity entity = EntityUtil.getEntity(depEntity.second, depEntity.first);
+                if (entity.getEntityType() != EntityType.FEED) {
+                    throw FalconWebException.newAPIException("Datasource dependents should be FEEDS, but"
+                        + "encountered type : " + entity.getEntityType());
+                }
+                Feed newFeed = (Feed) entity.copy();
+                for (org.apache.falcon.entity.v0.feed.Cluster feedCluster
+                        : newFeed.getClusters().getClusters()) {
+                    if (feedCluster.getType() == ClusterType.SOURCE) {
+                        boolean updatedFeed = isUpdateFeedDatasourceVersion(feedCluster, datasource, newFeed);
+                        if (updatedFeed) {
+                            // rewrite the dependent feed and update it on the store
+                            result.append(getWorkflowEngine(entity).update(entity, newFeed,
+                                    feedCluster.getName(), skipDryRun));
+                            updateEntityInConfigStore(entity, newFeed);
+                        }
+                    }
+                }
+            }
+            return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
+        } catch (FalconException e) {
+            LOG.error("Update failed", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private boolean isUpdateFeedDatasourceVersion(org.apache.falcon.entity.v0.feed.Cluster feedCluster,
+        Datasource datasource, Feed feed) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Datasource updateFeedImp = incFeedDatasourceVersion(datasource,
+                feed, feedCluster.getImport() != null ? feedCluster.getImport().getSource() : null);
+        org.apache.falcon.entity.v0.feed.Datasource updateFeedExp = incFeedDatasourceVersion(datasource,
+                feed, feedCluster.getExport() != null ? feedCluster.getExport().getTarget() : null);
+        return ((updateFeedImp != null) || (updateFeedExp != null));
+    }
+
+    private  org.apache.falcon.entity.v0.feed.Datasource  incFeedDatasourceVersion(Datasource datasource,
+        Feed feed, org.apache.falcon.entity.v0.feed.Datasource depDatasource) throws FalconException {
+        if ((depDatasource != null) && (datasource.getName().equals(depDatasource.getName()))) {
+            if (depDatasource.getVersion() < datasource.getVersion()) {
+                LOG.info(String.format("Updating since Feed '%s' referenced datasource '%s' "
+                        + "version '%d' < datasource entity version in store '%d'", feed.getName(),
+                        depDatasource.getName(), depDatasource.getVersion(), datasource.getVersion()));
+                depDatasource.setVersion(depDatasource.getVersion()+1);
+                return depDatasource;
+            } else if (depDatasource.getVersion() > datasource.getVersion()) {
+                throw new FalconException(String.format("Feed '%s' datasource '%s' version '%d' > datasource "
+                        + "entity version in store '%d'", feed.getName(), depDatasource.getName(),
+                        depDatasource.getVersion(), datasource.getVersion()));
+            }
+        }
+        return null;
+    }
+
+
+    /**
      * Updates scheduled dependent entities of a cluster.
      *
      * @param clusterName   Name of cluster

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java
new file mode 100644
index 0000000..9a11751
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.HsqldbTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * Integration test for Datasource Update.
+ */
+
+public class DatasourceUpdateIT {
+    public static final Logger LOG =  LoggerFactory.getLogger(DatasourceUpdateIT.class);
+
+    private static final String DATASOURCE_NAME_KEY = "datasourcename";
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        HsqldbTestUtils.start();
+        HsqldbTestUtils.createSqoopUser("sqoop_user", "sqoop");
+        HsqldbTestUtils.createSqoopUser("sqoop_user2", "sqoop");
+        HsqldbTestUtils.changeSAPassword("sqoop");
+        HsqldbTestUtils.createAndPopulateCustomerTable();
+
+        TestContext.cleanupStore();
+        TestContext.prepare();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        HsqldbTestUtils.tearDown();
+        FileUtils.deleteDirectory(new File("../localhost/"));
+        FileUtils.deleteDirectory(new File("localhost"));
+    }
+
+    @Test
+    public void testHSql() throws Exception {
+        Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows());
+    }
+
+    public void testDatasourceUpdate() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+        LOG.info("entity -submit -type cluster -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+        // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3
+        // are populated  with the same datasource name
+        String dsName = "datasource-test-1";
+        overlay.put(DATASOURCE_NAME_KEY, dsName);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay);
+        LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
+        LOG.info("Submit feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        // instantiate another datasource with same name but different end point
+        overlay.put(DATASOURCE_NAME_KEY, dsName);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE5, overlay);
+        LOG.info("update datasource {} via -update -type datasource -file {}", dsName, filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -update -type datasource -file " + filePath));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 5412608..5173032 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -95,6 +95,7 @@ public class TestContext extends AbstractTestContext {
     public static final String DATASOURCE_TEMPLATE2 = "/datasource-template2.xml";
     public static final String DATASOURCE_TEMPLATE3 = "/datasource-template3.xml";
     public static final String DATASOURCE_TEMPLATE4 = "/datasource-template4.xml";
+    public static final String DATASOURCE_TEMPLATE5 = "/datasource-template5.xml";
     public static final String CLUSTER_TEMPLATE = "/cluster-template.xml";
     public static final String CLUSTER_UPDATED_TEMPLATE = "/cluster-updated-template.xml";
     public static final String PIG_PROCESS_TEMPLATE = "/pig-process-template.xml";

http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/webapp/src/test/resources/datasource-template5.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/datasource-template5.xml b/webapp/src/test/resources/datasource-template5.xml
new file mode 100644
index 0000000..f5414b8
--- /dev/null
+++ b/webapp/src/test/resources/datasource-template5.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<datasource colo="##colo##" description="" type="hsql" name="##datasourcename##" xmlns="uri:falcon:datasource:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:hsql://localhost/db1">
+            <credential type="password-text">
+                <userName>sqoop_user2</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:hsql://localhost/db1">
+            <credential type="password-text">
+                <userName>sqoop_user</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <credential type="password-text">
+            <userName>sqoop_user</userName>
+            <passwordText>sqoop</passwordText>
+        </credential>
+    </interfaces>
+
+    <driver>
+        <clazz>org.hsqldb.jdbcDriver</clazz>
+        <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+</datasource>
\ No newline at end of file


Mime
View raw message