falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-1085 Support Cluster entity updates in Falcon Server
Date Tue, 10 May 2016 20:27:42 GMT
Repository: falcon
Updated Branches:
  refs/heads/master bb6032b2c -> f3ff8b27f


FALCON-1085 Support Cluster entity updates in Falcon Server

Added basic documentation, https://issues.apache.org/jira/browse/FALCON-1937 will contain detailed documentation.

Author: bvellanki <bvellanki@hortonworks.com>

Reviewers: "Venkat Ranganathan <venkat@hortonworks.com>, yzheng-hortonworks <yzheng@hortonworks.com>, peeyush b <pbishnoi@hortonworks.com>"

Closes #127 from bvellanki/FALCON-1085


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f3ff8b27
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f3ff8b27
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f3ff8b27

Branch: refs/heads/master
Commit: f3ff8b27f0a77d802306f0fc9ffdff51ae6c7486
Parents: bb6032b
Author: bvellanki <bvellanki@hortonworks.com>
Authored: Tue May 10 13:27:33 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Tue May 10 13:27:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/falcon/cli/FalconEntityCLI.java  |   9 +-
 .../org/apache/falcon/FalconCLIConstants.java   |   1 +
 .../org/apache/falcon/client/FalconClient.java  |   9 ++
 client/src/main/resources/cluster-0.1.xsd       |   1 +
 client/src/main/resources/datasource-0.1.xsd    |   3 +-
 client/src/main/resources/feed-0.1.xsd          |   2 +
 client/src/main/resources/process-0.1.xsd       |   2 +
 .../org/apache/falcon/entity/ClusterHelper.java |  50 ++++++-
 .../falcon/entity/ColoClusterRelation.java      |   3 +-
 .../org/apache/falcon/entity/EntityUtil.java    |  69 +++++++++
 .../falcon/entity/parser/FeedEntityParser.java  |   8 ++
 .../entity/parser/ProcessEntityParser.java      |   8 ++
 .../falcon/entity/store/ConfigurationStore.java |  18 ++-
 .../EntityRelationshipGraphBuilder.java         |  28 +++-
 .../org/apache/falcon/update/UpdateHelper.java  |  37 +++++
 .../falcon/entity/ColoClusterRelationTest.java  |  20 +++
 .../apache/falcon/entity/EntityUtilTest.java    |  21 +++
 .../entity/parser/ClusterEntityParserTest.java  |   3 +
 .../parser/DatasourceEntityParserTest.java      |   1 +
 .../entity/parser/FeedEntityParserTest.java     |  10 +-
 .../entity/parser/ProcessEntityParserTest.java  |  12 ++
 .../entity/store/ConfigurationStoreTest.java    |  31 +++++
 .../metadata/MetadataMappingServiceTest.java    |  23 +++
 .../apache/falcon/update/UpdateHelperTest.java  |  64 +++++++++
 .../resources/config/process/process-0.1.xml    |   2 +-
 .../src/site/twiki/falconcli/UpdateEntity.twiki |   7 +-
 docs/src/site/twiki/restapi/EntityUpdate.twiki  |   4 +-
 .../falcon/resource/AbstractEntityManager.java  |  95 ++++++++++++-
 .../proxy/SchedulableEntityManagerProxy.java    |  86 +++++++++---
 .../org/apache/falcon/unit/TestFalconUnit.java  |   6 +-
 .../falcon/resource/ConfigSyncService.java      |  16 +++
 .../resource/SchedulableEntityManager.java      |  33 ++++-
 .../falcon/cli/FalconClusterUpdateCLIIT.java    | 139 +++++++++++++++++++
 .../apache/falcon/cli/FalconSafemodeCLIIT.java  |   3 -
 .../org/apache/falcon/resource/TestContext.java |   1 +
 .../test/resources/cluster-updated-template.xml |  42 ++++++
 36 files changed, 817 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
index 37a6992..78b2225 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
@@ -66,6 +66,8 @@ public class FalconEntityCLI extends FalconCLI {
                 "Submits an entity xml to Falcon");
         Option update = new Option(FalconCLIConstants.UPDATE_OPT, false,
                 "Updates an existing entity xml");
+        Option updateClusterDependents = new Option(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT, false,
+                "Updates dependent entities of a cluster in workflow engine");
         Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false,
                 "Schedules a submited entity in Falcon");
         Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false,
@@ -96,6 +98,7 @@ public class FalconEntityCLI extends FalconCLI {
         OptionGroup group = new OptionGroup();
         group.addOption(submit);
         group.addOption(update);
+        group.addOption(updateClusterDependents);
         group.addOption(schedule);
         group.addOption(suspend);
         group.addOption(resume);
@@ -217,7 +220,8 @@ public class FalconEntityCLI extends FalconCLI {
         }
 
         EntityType entityTypeEnum = null;
-        if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
+        if (optionsList.contains(FalconCLIConstants.LIST_OPT)
+                || optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) {
             if (entityType == null) {
                 entityType = "";
             }
@@ -255,6 +259,9 @@ public class FalconEntityCLI extends FalconCLI {
             validateColo(optionsList);
             validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT);
             result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage();
+        }  else if (optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) {
+            validateNotEmpty(cluster, FalconCLIConstants.CLUSTER_OPT);
+            result = client.updateClusterDependents(cluster, skipDryRun, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
index 1db5cfe..31ead63 100644
--- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
+++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
@@ -47,6 +47,7 @@ public final class FalconCLIConstants {
     public static final String VERSION_OPT = "version";
     public static final String SUBMIT_OPT = "submit";
     public static final String UPDATE_OPT = "update";
+    public static final String UPDATE_CLUSTER_DEPENDENTS_OPT = "updateClusterDependents";
     public static final String DELETE_OPT = "delete";
     public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
     public static final String VALIDATE_OPT = "validate";

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 7a48973..1014d64 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -236,6 +236,7 @@ public class FalconClient extends AbstractFalconClient {
         VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML),
         SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML),
         UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML),
+        UPDATEDEPENDENTS("api/entities/updateClusterDependents/", HttpMethod.POST, MediaType.TEXT_XML),
         SUBMITANDSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML),
         SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML),
         SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML),
@@ -430,6 +431,14 @@ public class FalconClient extends AbstractFalconClient {
         return getResponse(APIResult.class, clientResponse);
     }
 
+    public APIResult updateClusterDependents(String clusterName, Boolean skipDryRun,
+                                             String doAsUser) throws FalconCLIException {
+        ClientResponse clientResponse = new ResourceBuilder().path(Entities.UPDATEDEPENDENTS.path, clusterName)
+                .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser)
+                .call(Entities.UPDATEDEPENDENTS);
+        return getResponse(APIResult.class, clientResponse);
+    }
+
     @Override
     public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun,
                                        String doAsUser, String properties) throws FalconCLIException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/cluster-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/cluster-0.1.xsd b/client/src/main/resources/cluster-0.1.xsd
index 0e0ada8..03e9f84 100644
--- a/client/src/main/resources/cluster-0.1.xsd
+++ b/client/src/main/resources/cluster-0.1.xsd
@@ -75,6 +75,7 @@
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:string" name="description"/>
         <xs:attribute type="xs:string" name="colo" use="required"/>
+        <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
     </xs:complexType>
     <xs:complexType name="locations">
         <xs:annotation>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/datasource-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/datasource-0.1.xsd b/client/src/main/resources/datasource-0.1.xsd
index 1202ba1..ef78239 100644
--- a/client/src/main/resources/datasource-0.1.xsd
+++ b/client/src/main/resources/datasource-0.1.xsd
@@ -76,6 +76,7 @@
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:string"  name="colo" use="required"/>
         <xs:attribute type="xs:string"  name="description"/>
+        <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
         <xs:attribute type="datasource-type"  name="type" use="required">
             <xs:annotation>
                 <xs:documentation>
@@ -263,7 +264,7 @@
     <xs:complexType name="ACL">
         <xs:annotation>
             <xs:documentation>
-                Access control list for this cluster.
+                Access control list for this Entity.
                 owner is the Owner of this entity.
                 group is the one which has access to read - not used at this time.
                 permission is not enforced at this time

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 77e8663..3488233 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -129,6 +129,7 @@
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:string" name="description"/>
+        <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
     </xs:complexType>
     <xs:complexType name="cluster">
         <xs:annotation>
@@ -168,6 +169,7 @@
         <xs:attribute type="cluster-type" name="type" use="optional"/>
         <xs:attribute type="xs:string" name="partition" use="optional"/>
         <xs:attribute type="frequency-type" name="delay" use="optional" />
+        <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
     </xs:complexType>
     <xs:complexType name="partitions">
         <xs:annotation>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 456ebf9..0d01e33 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -188,6 +188,7 @@
             <xs:element type="ACL" name="ACL" minOccurs="0"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
+        <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
     </xs:complexType>
 
     <xs:simpleType name="IDENTIFIER">
@@ -219,6 +220,7 @@
             <xs:element type="validity" name="validity"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
+        <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
     </xs:complexType>
 
     <xs:complexType name="validity">

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 9d79742..aff4405 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -32,6 +32,8 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -45,7 +47,7 @@ public final class ClusterHelper {
     public static final String WORKINGDIR = "working";
     public static final String NO_USER_BROKER_URL = "NA";
     public static final String EMPTY_DIR_NAME = "EMPTY_DIR_DONT_DELETE";
-
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterHelper.class);
 
     private ClusterHelper() {
     }
@@ -123,6 +125,9 @@ public final class ClusterHelper {
     }
 
     public static Interface getInterface(Cluster cluster, Interfacetype type) {
+        if (cluster.getInterfaces() == null) {
+            return null;
+        }
         for (Interface interf : cluster.getInterfaces().getInterfaces()) {
             if (interf.getType() == type) {
                 return interf;
@@ -143,6 +148,9 @@ public final class ClusterHelper {
 
 
     public static Location getLocation(Cluster cluster, ClusterLocationType clusterLocationType) {
+        if (cluster.getLocations() == null) {
+            return null;
+        }
         for (Location loc : cluster.getLocations().getLocations()) {
             if (loc.getName().equals(clusterLocationType)) {
                 return loc;
@@ -211,4 +219,44 @@ public final class ClusterHelper {
         return getStorageUrl(cluster) + getLocation(cluster, ClusterLocationType.STAGING).getPath()
                 + "/" + EMPTY_DIR_NAME;
     }
+
+    public static boolean matchInterface(final Cluster oldEntity, final Cluster newEntity,
+                                         final Interfacetype interfaceType) {
+        Interface oldInterface = getInterface(oldEntity, interfaceType);
+        Interface newInterface = getInterface(newEntity, interfaceType);
+        String oldEndpoint = (oldInterface == null) ? null : oldInterface.getEndpoint();
+        String newEndpoint = (newInterface == null) ? null : newInterface.getEndpoint();
+        LOG.debug("Verifying if Interfaces match for cluster {} : Old - {}, New - {}",
+                interfaceType.name(), oldEndpoint, newEndpoint);
+        return StringUtils.isBlank(oldEndpoint) && StringUtils.isBlank(newEndpoint)
+                || StringUtils.isNotBlank(oldEndpoint) && oldEndpoint.equalsIgnoreCase(newEndpoint);
+    }
+
+    public static boolean matchLocations(final Cluster oldEntity, final Cluster newEntity,
+                                         final ClusterLocationType locationType) {
+        Location oldLocation = getLocation(oldEntity, locationType);
+        Location newLocation = getLocation(newEntity, locationType);
+        String oldLocationPath = (oldLocation == null) ? null : oldLocation.getPath();
+        String newLocationPath = (newLocation == null) ? null : newLocation.getPath();
+        LOG.debug("Verifying if Locations match {} : Old - {}, New - {}",
+                locationType.name(), oldLocationPath, newLocationPath);
+        return  StringUtils.isBlank(oldLocationPath) && StringUtils.isBlank(newLocationPath)
+                || StringUtils.isNotBlank(oldLocationPath) && oldLocationPath.equalsIgnoreCase(newLocationPath);
+    }
+
+    public static boolean matchProperties(final Cluster oldEntity, final Cluster newEntity) {
+        Map<String, String> oldProps = getClusterProperties(oldEntity);
+        Map<String, String> newProps = getClusterProperties(newEntity);
+        return oldProps.equals(newProps);
+    }
+
+    private static Map<String, String> getClusterProperties(final Cluster cluster) {
+        Map<String, String> returnProps = new HashMap<String, String>();
+        if (cluster.getProperties() != null) {
+            for (Property prop : cluster.getProperties().getProperties()) {
+                returnProps.put(prop.getName(), prop.getValue());
+            }
+        }
+        return returnProps;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
index e4ca91b..a141e43 100644
--- a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
+++ b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
@@ -79,7 +79,8 @@ public final class ColoClusterRelation implements ConfigurationChangeListener {
         if (oldEntity.getEntityType() != EntityType.CLUSTER) {
             return;
         }
-        throw new FalconException("change shouldn't be supported on cluster!");
+        onRemove(oldEntity);
+        onAdd(newEntity);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index b181ece..51172f2 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -35,6 +35,7 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.datasource.DatasourceType;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.ClusterType;
@@ -130,6 +131,7 @@ public final class EntityUtil {
     public enum ENTITY_OPERATION {
         SUBMIT,
         UPDATE,
+        UPDATE_CLUSTER_DEPENDENTS,
         SCHEDULE,
         SUBMIT_AND_SCHEDULE,
         DELETE,
@@ -706,6 +708,40 @@ public final class EntityUtil {
         }
     }
 
+    public static Integer getVersion(final Entity entity) throws FalconException {
+        switch (entity.getEntityType()) {
+        case FEED:
+            return ((Feed)entity).getVersion();
+        case PROCESS:
+            return ((Process)entity).getVersion();
+        case CLUSTER:
+            return ((Cluster)entity).getVersion();
+        case DATASOURCE:
+            return ((Datasource)entity).getVersion();
+        default:
+            throw new FalconException("Invalid entity type:" + entity.getEntityType());
+        }
+    }
+
+    public static void setVersion(Entity entity, final Integer version) throws FalconException {
+        switch (entity.getEntityType()) {
+        case FEED:
+            ((Feed)entity).setVersion(version);
+            break;
+        case PROCESS:
+            ((Process)entity).setVersion(version);
+            break;
+        case CLUSTER:
+            ((Cluster)entity).setVersion(version);
+            break;
+        case DATASOURCE:
+            ((Datasource)entity).setVersion(version);
+            break;
+        default:
+            throw new FalconException("Invalid entity type:" + entity.getEntityType());
+        }
+    }
+
     //Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml
     //Each entity update creates a new staging path
     //Base staging path is the base path for all staging dirs
@@ -1123,4 +1159,37 @@ public final class EntityUtil {
         return instancePath;
     }
 
+    /**
+     * Returns true if entity is dependent on cluster, else returns false.
+     * @param entity
+     * @param clusterName
+     * @return
+     */
+    public static boolean isEntityDependentOnCluster(Entity entity, String clusterName) {
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            return entity.getName().equalsIgnoreCase(clusterName);
+
+        case FEED:
+            Feed feed = (Feed) entity;
+            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+                if (cluster.getName().equalsIgnoreCase(clusterName)) {
+                    return true;
+                }
+            }
+            break;
+
+        case PROCESS:
+            Process process = (Process) entity;
+            for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+                if (cluster.getName().equalsIgnoreCase(clusterName)) {
+                    return true;
+                }
+            }
+            break;
+        default:
+        }
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 2a9a852..28fdaf8 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -97,6 +97,14 @@ public class FeedEntityParser extends EntityParser<Feed> {
                 cluster.getValidity().setEnd(DateUtil.NEVER);
             }
 
+            // set Cluster version
+            int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
+            if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
+                throw new ValidationException("Feed should not set cluster to a version that does not exist");
+            } else {
+                cluster.setVersion(clusterVersion);
+            }
+
             validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(),
                     cluster.getName());
             validateClusterHasRegistry(feed, cluster);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 16fd8b3..8edec5b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -91,6 +91,14 @@ public class ProcessEntityParser extends EntityParser<Process> {
                 cluster.getValidity().setEnd(DateUtil.NEVER);
             }
 
+            // set Cluster version
+            int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
+            if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
+                throw new ValidationException("Process should not set cluster to a version that does not exist");
+            } else {
+                cluster.setVersion(clusterVersion);
+            }
+
             validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
             validateHDFSPaths(process, clusterName);
             validateProperties(process);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index bdcd1af..7f2b172 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -20,12 +20,15 @@ package org.apache.falcon.entity.store;
 
 import org.apache.commons.codec.CharEncoding;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.service.FalconService;
+import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.FileStatus;
@@ -242,9 +245,10 @@ public final class ConfigurationStore implements FalconService {
     private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException {
         try {
             if (get(type, entity.getName()) != null) {
-                persist(type, entity);
                 ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
                 Entity oldEntity = entityMap.get(entity.getName());
+                updateVersion(oldEntity, entity);
+                persist(type, entity);
                 onChange(oldEntity, entity);
                 entityMap.put(entity.getName(), entity);
             } else {
@@ -256,6 +260,18 @@ public final class ConfigurationStore implements FalconService {
         AUDIT.info(type + "/" + entity.getName() + " is replaced into config store");
     }
 
+    private void updateVersion(Entity oldentity, Entity newEntity) throws FalconException {
+        // increase version number for cluster only if dependent feeds/process needs to be updated.
+        if (oldentity.getEntityType().equals(EntityType.CLUSTER)) {
+            if (UpdateHelper.isClusterEntityUpdated((Cluster) oldentity, (Cluster) newEntity)) {
+                EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1);
+            }
+        } else if (!EntityUtil.equals(oldentity, newEntity)) {
+            // Increase version for other entities if they actually changed.
+            EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity));
+        }
+    }
+
     public synchronized void update(EntityType type, Entity entity) throws FalconException {
         if (updatesInProgress.get() == entity) {
             try {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 25bbf0c..e6851df 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -120,7 +120,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         EntityType entityType = oldEntity.getEntityType();
         switch (entityType) {
         case CLUSTER:
-            // a cluster cannot be updated
+            updateClusterEntity((Cluster) oldEntity, (Cluster) newEntity);
             break;
         case PROCESS:
             updateProcessEntity((Process) oldEntity, (Process) newEntity);
@@ -133,7 +133,33 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
+    private void updateClusterEntity(Cluster oldCluster, Cluster newCluster) {
+        LOG.info("Updating Cluster entity: {}", newCluster.getName());
+        Vertex clusterEntityVertex = findVertex(oldCluster.getName(), RelationshipType.CLUSTER_ENTITY);
+        if (clusterEntityVertex == null) {
+            LOG.error("Illegal State: Cluster entity vertex must exist for {}", oldCluster.getName());
+            throw new IllegalStateException(oldCluster.getName() + " entity vertex must exist.");
+        }
+        updateColoEdge(oldCluster.getColo(), newCluster.getColo(), clusterEntityVertex);
+        updateDataClassification(oldCluster.getTags(), newCluster.getTags(), clusterEntityVertex);
+    }
+
+    private void updateColoEdge(String oldColo, String newColo, Vertex clusterEntityVertex) {
+        if (areSame(oldColo, newColo)) {
+            return;
+        }
 
+        Vertex oldColoVertex = findVertex(oldColo, RelationshipType.COLO);
+        if (oldColoVertex != null) {
+            removeEdge(clusterEntityVertex, oldColoVertex, RelationshipLabel.CLUSTER_COLO.getName());
+        }
+        Vertex newColoVertex = findVertex(newColo, RelationshipType.COLO);
+        if (newColoVertex == null) {
+            newColoVertex = addVertex(newColo, RelationshipType.COLO);
+        }
+
+        addEdge(clusterEntityVertex, newColoVertex, RelationshipLabel.CLUSTER_COLO.getName());
+    }
 
     public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
         LOG.info("Updating feed entity: {}", newFeed.getName());

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 6603bc6..ae88a01 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -20,12 +20,15 @@ package org.apache.falcon.update;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
@@ -71,6 +74,10 @@ public final class UpdateHelper {
         case PROCESS:
             return !EntityUtil.equals(oldView, newView, PROCESS_FIELDS);
 
+        case CLUSTER:
+            return isClusterEntityUpdated((org.apache.falcon.entity.v0.cluster.Cluster) oldEntity,
+                    (org.apache.falcon.entity.v0.cluster.Cluster) newEntity);
+
         default:
         }
         throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType());
@@ -129,4 +136,34 @@ public final class UpdateHelper {
             throw new FalconException("Don't know what to do. Unexpected scenario");
         }
     }
+
+    public static boolean isClusterEntityUpdated(final org.apache.falcon.entity.v0.cluster.Cluster oldEntity,
+                                                        final org.apache.falcon.entity.v0.cluster.Cluster newEntity) {
+        /*
+         * Name should not be updated.
+         * interface, locations, properties, colo : Update bundle/coord for dependent entities.
+         * Description, tags, ACL : no need to update bundle/coord for dependent entities.
+         */
+        if (!oldEntity.getColo().equals(newEntity.getColo())) {
+            return true;
+        }
+
+        for(Interfacetype interfacetype : Interfacetype.values()) {
+            if (!ClusterHelper.matchInterface(oldEntity, newEntity, interfacetype)) {
+                return true;
+            }
+        }
+
+        for(ClusterLocationType locationType : ClusterLocationType.values()) {
+            if (!ClusterHelper.matchLocations(oldEntity, newEntity, locationType)) {
+                return true;
+            }
+        }
+
+        if (!ClusterHelper.matchProperties(oldEntity, newEntity)) {
+            return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java b/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
index 0d6e754..2abcece 100644
--- a/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
@@ -69,5 +69,25 @@ public class ColoClusterRelationTest extends AbstractTestBase {
         clusters = relation.getClusters("colo1");
         Assert.assertNotNull(clusters);
         Assert.assertEquals(0, clusters.size());
+
+        Cluster updatedCluster3 = new Cluster();
+        updatedCluster3.setName(cluster3.getName());
+        updatedCluster3.setColo("colo3");
+        try {
+            getStore().initiateUpdate(updatedCluster3);
+            getStore().update(EntityType.CLUSTER, updatedCluster3);
+        } finally {
+            getStore().cleanupUpdateInit();
+        }
+
+        relation = ColoClusterRelation.get();
+        clusters = relation.getClusters("colo3");
+        Assert.assertNotNull(clusters);
+        Assert.assertEquals(1, clusters.size());
+        Assert.assertTrue(clusters.contains(updatedCluster3.getName()));
+
+        clusters = relation.getClusters("colo2");
+        Assert.assertNotNull(clusters);
+        Assert.assertEquals(0, clusters.size());
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index c87449c..766b2fa 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -450,4 +450,25 @@ public class EntityUtilTest extends AbstractTestBase {
         // Ensure latest is returned.
         Assert.assertEquals(EntityUtil.getLatestStagingPath(cluster, process).getName(), md5 + "_1436357052992");
     }
+
+    @Test
+    public void testIsClusterUsedByEntity() throws Exception {
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(
+                getClass().getResourceAsStream(PROCESS_XML));
+        Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
+                getClass().getResourceAsStream(FEED_XML));
+        org.apache.falcon.entity.v0.cluster.Cluster cluster =
+                (org.apache.falcon.entity.v0.cluster.Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(
+                        getClass().getResourceAsStream(CLUSTER_XML));
+
+        Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(cluster, "testCluster"));
+        Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(feed, "testCluster"));
+        Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(feed, "backupCluster"));
+        Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(process, "testCluster"));
+
+        Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(cluster, "fakeCluster"));
+        Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(feed, "fakeCluster"));
+        Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(process, "fakeCluster"));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index c45909f..4b4b657 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -175,6 +175,9 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         // Good set of properties, should work
         clusterEntityParser.validateProperties(cluster);
 
+        // validate version
+        Assert.assertEquals(cluster.getVersion(), 0);
+
         // add duplicate property, should throw validation exception.
         Property property1 = new Property();
         property1.setName("field1");

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
index 3893917..6ade9c9 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
@@ -76,6 +76,7 @@ public class DatasourceEntityParserTest extends AbstractTestBase {
         Assert.assertEquals("test-hsql-db", databaseEntity.getName());
         Assert.assertEquals("hsql", databaseEntity.getType().value());
         Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz());
+        Assert.assertEquals(datasource.getVersion(), 0);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index ceec3c4..c642fb8 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -35,11 +35,11 @@ import org.apache.falcon.entity.v0.feed.ActionType;
 import org.apache.falcon.entity.v0.feed.Argument;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.ExtractMethod;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.MergeType;
-import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Partitions;
 import org.apache.falcon.entity.v0.feed.Property;
@@ -87,11 +87,13 @@ public class FeedEntityParserTest extends AbstractTestBase {
         Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
                 .getResourceAsStream(CLUSTER_XML));
         cluster.setName("testCluster");
+        cluster.setVersion(0);
         store.publish(EntityType.CLUSTER, cluster);
 
         cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
                 .getResourceAsStream(CLUSTER_XML));
         cluster.setName("backupCluster");
+        cluster.setVersion(1);
         store.publish(EntityType.CLUSTER, cluster);
 
         LifecyclePolicyMap.get().init();
@@ -123,11 +125,14 @@ public class FeedEntityParserTest extends AbstractTestBase {
         assertEquals(feed.getSla().getSlaHigh().toString(), "hours(3)");
         assertEquals(feed.getSla().getSlaLow().toString(), "hours(2)");
         assertEquals(feed.getGroups(), "online,bi");
+        Assert.assertEquals(feed.getVersion(), 0);
 
         assertEquals(feed.getClusters().getClusters().get(0).getName(),
                 "testCluster");
         assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaLow().toString(), "hours(3)");
         assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaHigh().toString(), "hours(4)");
+        assertEquals(feed.getClusters().getClusters().get(0).getVersion(), 0);
+        assertEquals(feed.getClusters().getClusters().get(1).getVersion(), 1);
 
         assertEquals(feed.getClusters().getClusters().get(0).getType(),
                 ClusterType.SOURCE);
@@ -633,6 +638,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
         Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
                 .getResourceAsStream(("/config/cluster/cluster-no-registry.xml")));
         cluster.setName("badTestCluster");
+        cluster.setVersion(0);
         ConfigurationStore.get().publish(EntityType.CLUSTER, cluster);
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 7159966..64f62a5 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -121,6 +121,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()), "2011-11-02T00:00Z");
         Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()), "2091-12-30T00:00Z");
         Assert.assertEquals(process.getTimezone().getID(), "UTC");
+        Assert.assertEquals(processCluster.getVersion(), 0);
 
         Assert.assertEquals(process.getSla().getShouldStartIn().toString(), "hours(2)");
         Assert.assertEquals(process.getSla().getShouldEndIn().toString(), "hours(4)");
@@ -386,6 +387,17 @@ public class ProcessEntityParserTest extends AbstractTestBase {
     }
 
     @Test
+    public void testValidateVersion() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
+
+        Process process = parser.parse(stream);
+        Assert.assertEquals(process.getVersion(), 0);
+        process.setVersion(10);
+        parser.validate(process);
+        Assert.assertEquals(process.getVersion(), 10);
+    }
+
+    @Test
     public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
         InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
index fa3d3f4..8056e80 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
@@ -19,8 +19,10 @@
 package org.apache.falcon.entity.store;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.util.StartupProperties;
@@ -96,6 +98,7 @@ public class ConfigurationStoreTest {
         store.publish(EntityType.PROCESS, process);
         Process p = store.get(EntityType.PROCESS, "hello");
         Assert.assertEquals(p, process);
+        Assert.assertEquals(p.getVersion(), 0);
 
         store.registerListener(listener);
         process.setName("world");
@@ -109,6 +112,34 @@ public class ConfigurationStoreTest {
     }
 
     @Test
+    public void testUpdate() throws Exception {
+        Cluster cluster1 = createClusterObj();
+        store.publish(EntityType.CLUSTER, cluster1);
+        Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0);
+
+        Cluster cluster2 = createClusterObj();
+        cluster2.setDescription("new Desc");
+        store.initiateUpdate(cluster2);
+        store.update(EntityType.CLUSTER, cluster2);
+        store.cleanupUpdateInit();
+        Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0);
+
+        Cluster cluster3 = createClusterObj();
+        cluster3.setColo("newColo");
+        store.initiateUpdate(cluster3);
+        store.update(EntityType.CLUSTER, cluster3);
+        store.cleanupUpdateInit();
+        Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 1);
+    }
+
+    private Cluster createClusterObj() {
+        Cluster cluster = new Cluster();
+        cluster.setName("cluster1");
+        cluster.setColo("colo1");
+        return cluster;
+    }
+
+    @Test
     public void testGet() throws Exception {
         Process p = store.get(EntityType.PROCESS, "notfound");
         Assert.assertNull(p);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 29f933d..228f522 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -606,6 +606,29 @@ public class MetadataMappingServiceTest {
         verifyLineageGraphForJobCounters(context);
     }
 
+    @Test(dependsOnMethods = "testOnFeedEntityChange")
+    public void testOnClusterEntityChange() throws Exception {
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
+        Cluster oldCluster = clusterEntity;
+        Cluster newCluster = EntityBuilderTestUtil.buildCluster(oldCluster.getName(),
+                "clusterUpdateColo", oldCluster.getTags() + ",clusterUpdateTagKey=clusterUpdateTagVal");
+
+        try {
+            configStore.initiateUpdate(newCluster);
+            configStore.update(EntityType.CLUSTER, newCluster);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); // +1 new tag +1 new colo
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 1); // +1 new tag edge
+        Vertex newClusterVertex = getEntityVertex(newCluster.getName(), RelationshipType.CLUSTER_ENTITY);
+        verifyVertexForEdge(newClusterVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(),
+                "clusterUpdateColo", RelationshipType.COLO.getName());
+    }
+
     private void verifyUpdatedEdges(Process newProcess) {
         Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index 3e48e26..52b7103 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -21,6 +21,7 @@ package org.apache.falcon.update;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.parser.EntityParserFactory;
@@ -30,7 +31,11 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.ACL;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
@@ -50,6 +55,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.xml.bind.Unmarshaller;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -301,6 +307,64 @@ public class UpdateHelperTest extends AbstractTestBase {
         Assert.assertTrue(UpdateHelper.isEntityUpdated(newProcess, newerProcess, cluster, procPath));
     }
 
+    @Test
+    public void testIsClusterEntityUpdated() throws Exception {
+        Unmarshaller unmarshaller = EntityType.CLUSTER.getUnmarshaller();
+
+        String cluster = "testCluster";
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        Cluster newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+        newClusterEntity.setName(cluster);
+        Assert.assertNotNull(newClusterEntity);
+
+        // Tags, ACL, description update should not update bundle/workflow for dependent entities
+        ACL acl = new ACL();
+        acl.setOwner("Test");
+        acl.setGroup("testGroup");
+        acl.setPermission("*");
+        newClusterEntity.setACL(acl);
+        newClusterEntity.setDescription("New Description");
+        newClusterEntity.setTags("test=val,test2=val2");
+        Assert.assertFalse(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+        // Changing colo should trigger update
+        newClusterEntity.setColo("NewColoValue");
+        Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+
+        // Updating an interface should trigger update bundle/workflow for dependent entities
+        Interface writeInterface = ClusterHelper.getInterface(newClusterEntity, Interfacetype.WRITE);
+        newClusterEntity.getInterfaces().getInterfaces().remove(writeInterface);
+        Assert.assertNotNull(writeInterface);
+        writeInterface.setEndpoint("hdfs://test.host.name:8020");
+        writeInterface.setType(Interfacetype.WRITE);
+        writeInterface.setVersion("2.2.0");
+        newClusterEntity.getInterfaces().getInterfaces().add(writeInterface);
+        Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+        // Updating a property should trigger update bundle/workflow for dependent entities
+        newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+        newClusterEntity.setName(cluster);
+        Assert.assertNotNull(newClusterEntity);
+        org.apache.falcon.entity.v0.cluster.Property property = new org.apache.falcon.entity.v0.cluster.Property();
+        property.setName("testName");
+        property.setValue("testValue");
+        newClusterEntity.getProperties().getProperties().add(property);
+        Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+        // Updating a location should trigger update bundle/workflow for dependent entities
+        newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+        newClusterEntity.setName(cluster);
+        Assert.assertNotNull(newClusterEntity);
+        org.apache.falcon.entity.v0.cluster.Location stagingLocation =
+                ClusterHelper.getLocation(newClusterEntity, ClusterLocationType.STAGING);
+        Assert.assertNotNull(stagingLocation);
+        newClusterEntity.getInterfaces().getInterfaces().remove(stagingLocation);
+        stagingLocation.setPath("/test/path/here");
+        newClusterEntity.getLocations().getLocations().add(stagingLocation);
+        Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+    }
+
     private static Location getLocation(Feed feed, LocationType type, String cluster) {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster);
         if (feedCluster.getLocations() != null) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml
index 039208c..4ce7ad1 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -16,7 +16,7 @@
   See the License for the specific language governing permissions and
   limitations under the License.
   -->
-<process name="sample" xmlns="uri:falcon:process:0.1">
+<process name="sample" version="0" xmlns="uri:falcon:process:0.1">
     <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
     <pipelines>testPipeline,dataReplication_Pipeline</pipelines>
     <clusters>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/docs/src/site/twiki/falconcli/UpdateEntity.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/falconcli/UpdateEntity.twiki b/docs/src/site/twiki/falconcli/UpdateEntity.twiki
index 5d49a76..146a60f 100644
--- a/docs/src/site/twiki/falconcli/UpdateEntity.twiki
+++ b/docs/src/site/twiki/falconcli/UpdateEntity.twiki
@@ -2,12 +2,15 @@
 
 [[CommonCLI][Common CLI Options]]
 
-Update operation allows an already submitted/scheduled entity to be updated and put it into the archive.Archive path is defined in startup.properties in variable "config.store.uri". Cluster and datasource updates are currently not allowed.
+Update operation allows an already submitted/scheduled entity to be updated and put it into the archive.Archive path is defined in startup.properties in variable "config.store.uri". Datasource updates are currently not allowed.
 
 Usage:
-$FALCON_HOME/bin/falcon entity  -type [feed|process] -name <<name>> -update -file <<path_to_file>>
+$FALCON_HOME/bin/falcon entity  -type [cluster|feed|process] -name <<name>> -update -file <<path_to_file>>
 
 Optional Arg : -skipDryRun. When this argument is specified, Falcon skips oozie dryrun.
 
 Example:
 $FALCON_HOME/bin/falcon entity -type process -name hourly-reports-generator -update -file /process/definition.xml
+
+Note: When a cluster entity is updated, the dependent feed and process bundle+coordinators are updated in the
+workflow engine. Hence, only a falcon superuser who has ability to impersonate other users can update a cluster entity.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/docs/src/site/twiki/restapi/EntityUpdate.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityUpdate.twiki b/docs/src/site/twiki/restapi/EntityUpdate.twiki
index 46b01fc..cbf33db 100644
--- a/docs/src/site/twiki/restapi/EntityUpdate.twiki
+++ b/docs/src/site/twiki/restapi/EntityUpdate.twiki
@@ -8,8 +8,8 @@
 Updates the submitted entity.
 
 ---++ Parameters
-   * :entity-type can be feed or process.
-   * :entity-name is name of the feed or process.
+   * :entity-type can be cluster, feed or process.
+   * :entity-name is name of the cluster, feed or process.
    * skipDryRun : Optional query param, Falcon skips oozie dryrun when value is set to true.
    * doAs <optional query param> allows the current user to impersonate the user passed in doAs when interacting with the Falcon system.
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index b319dd1..1f6be41 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -39,10 +39,14 @@ import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityIntegrityChecker;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.resource.APIResult.Status;
 import org.apache.falcon.resource.EntityList.EntityElement;
 import org.apache.falcon.resource.metadata.AbstractMetadataResource;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.DefaultAuthorizationProvider;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
@@ -50,6 +54,7 @@ import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -333,8 +338,8 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
             obtainEntityLocks(oldEntity, "update", tokenList);
 
             StringBuilder result = new StringBuilder("Updated successfully");
-            //Update in workflow engine
-            if (!DeploymentUtil.isPrism()) {
+            //Update in workflow engine if entity is not a cluster (cluster entity is not scheduled)
+            if (!DeploymentUtil.isPrism() && !entityType.equals(EntityType.CLUSTER)) {
                 Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
                 Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
                 newClusters.retainAll(oldClusters); //common clusters for update
@@ -359,6 +364,80 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
         }
     }
 
+    /**
+     * Updates scheduled dependent entities of a cluster.
+     *
+     * @param clusterName   Name of cluster
+     * @param colo colo
+     * @param skipDryRun Skip dry run during update if set to true
+     * @return APIResult
+     */
+    public APIResult updateClusterDependents(String clusterName, String colo, Boolean skipDryRun) {
+        checkColo(colo);
+        try {
+            Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
+            verifySafemodeOperation(cluster, EntityUtil.ENTITY_OPERATION.UPDATE_CLUSTER_DEPENDENTS);
+            int clusterVersion = cluster.getVersion();
+            StringBuilder result = new StringBuilder("Updating entities dependent on cluster \n");
+            // get dependent entities. check if cluster version changed. if yes, update dependent entities
+            Pair<String, EntityType>[] dependentEntities = EntityIntegrityChecker.referencedBy(cluster);
+            if (dependentEntities == null) {
+                // nothing to update
+                return new APIResult(APIResult.Status.SUCCEEDED, "Cluster "
+                        + clusterName + " has no dependent entities");
+            }
+            for (Pair<String, EntityType> depEntity : dependentEntities) {
+                Entity entity = EntityUtil.getEntity(depEntity.second, depEntity.first);
+                switch (entity.getEntityType()) {
+                case FEED:
+                    Clusters feedClusters = ((Feed)entity).getClusters();
+                    List<org.apache.falcon.entity.v0.feed.Cluster> updatedFeedClusters =
+                            new ArrayList<org.apache.falcon.entity.v0.feed.Cluster>();
+                    if (feedClusters != null) {
+                        for(org.apache.falcon.entity.v0.feed.Cluster feedCluster : feedClusters.getClusters()) {
+                            if (feedCluster.getName().equals(clusterName)
+                                    && feedCluster.getVersion() != clusterVersion) {
+                                // update feed cluster entity
+                                feedCluster.setVersion(clusterVersion);
+                            }
+                            updatedFeedClusters.add(feedCluster);
+                        }
+                        ((Feed)entity).getClusters().getClusters().clear();
+                        ((Feed)entity).getClusters().getClusters().addAll(updatedFeedClusters);
+                        result.append(update(entity, entity.getEntityType().name(),
+                                entity.getName(), skipDryRun).getMessage());
+                    }
+                    break;
+                case PROCESS:
+                    org.apache.falcon.entity.v0.process.Clusters processClusters = ((Process)entity).getClusters();
+                    List<org.apache.falcon.entity.v0.process.Cluster> updatedProcClusters =
+                            new ArrayList<org.apache.falcon.entity.v0.process.Cluster>();
+                    if (processClusters != null) {
+                        for(org.apache.falcon.entity.v0.process.Cluster procCluster : processClusters.getClusters()) {
+                            if (procCluster.getName().equals(clusterName)
+                                    && procCluster.getVersion() != clusterVersion) {
+                                // update feed cluster entity
+                                procCluster.setVersion(clusterVersion);
+                            }
+                            updatedProcClusters.add(procCluster);
+                        }
+                        ((Process)entity).getClusters().getClusters().clear();
+                        ((Process)entity).getClusters().getClusters().addAll(updatedProcClusters);
+                        result.append(update(entity, entity.getEntityType().name(),
+                                entity.getName(), skipDryRun).getMessage());
+                    }
+                    break;
+                default:
+                    break;
+                }
+            }
+            return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
+        } catch (FalconException e) {
+            LOG.error("Update failed", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
     private void obtainEntityLocks(Entity entity, String command, List<Entity> tokenList)
         throws FalconException {
         //first obtain lock for the entity for which update is issued.
@@ -397,14 +476,19 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
 
     }
 
-    private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException {
+    private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException, IOException {
         if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) {
             throw new FalconException(
                     oldEntity.toShortString() + " can't be updated with " + newEntity.toShortString());
         }
 
         if (oldEntity.getEntityType() == EntityType.CLUSTER) {
-            throw new FalconException("Update not supported for clusters");
+            final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI();
+            DefaultAuthorizationProvider authorizationProvider = new DefaultAuthorizationProvider();
+            if (!authorizationProvider.isSuperUser(authenticatedUGI)) {
+                throw new FalconException("Permission denied : "
+                        + "Cluster entity update can only be performed by superuser.");
+            }
         }
 
         String[] props = oldEntity.getEntityType().getImmutableProperties();
@@ -455,7 +539,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
             if (entity.getEntityType().equals(EntityType.CLUSTER)) {
                 return;
             } else {
-                LOG.error("Entity operation {} is not allowed on non-cluster entities during safemode",
+                LOG.error("Entity operation {} is only allowed on cluster entities during safemode",
                         operation.name());
                 throw FalconWebException.newAPIException("Entity operation " + operation.name()
                         + " is only allowed on cluster entities during safemode");
@@ -470,6 +554,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
                 return;
             }
         case SCHEDULE:
+        case UPDATE_CLUSTER_DEPENDENTS:
         case SUBMIT_AND_SCHEDULE:
         case DELETE:
         case RESUME:

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 168f18e..53a9de1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -18,25 +18,6 @@
 
 package org.apache.falcon.resource.proxy;
 
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconRuntimException;
@@ -58,6 +39,24 @@ import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
 import org.apache.falcon.util.DeploymentUtil;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * A proxy implementation of the schedulable entity operations.
  */
@@ -380,6 +379,55 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     }
 
     /**
+     * Updates the dependent entities of a cluster in workflow engine.
+     * @param clusterName Name of cluster.
+     * @param ignore colo.
+     * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true.
+     * @return Result of the validation.
+     */
+    @POST
+    @Path("updateClusterDependents/{clusterName}")
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    @Monitored(event = "updateClusterDependents")
+    @Override
+    public APIResult updateClusterDependents(
+            @Dimension("entityName") @PathParam("clusterName") final String clusterName,
+            @Dimension("colo") @QueryParam("colo") String ignore,
+            @QueryParam("skipDryRun") final Boolean skipDryRun) {
+
+        final Set<String> allColos = getApplicableColos("cluster", clusterName);
+        Map<String, APIResult> results = new HashMap<String, APIResult>();
+        boolean result = true;
+
+        if (!allColos.isEmpty()) {
+            results.put(FALCON_TAG + "/updateClusterDependents", new EntityProxy("cluster", clusterName) {
+                @Override
+                protected Set<String> getColosToApply() {
+                    return allColos;
+                }
+
+                @Override
+                protected APIResult doExecute(String colo) throws FalconException {
+                    return getConfigSyncChannel(colo).invoke("updateClusterDependents", clusterName,
+                            colo, skipDryRun);
+                }
+            }.execute());
+        }
+
+        for (APIResult apiResult : results.values()) {
+            if (apiResult.getStatus() != APIResult.Status.SUCCEEDED) {
+                result = false;
+            }
+        }
+        // update only if all are updated
+        if (!embeddedMode && result) {
+            results.put(PRISM_TAG, super.updateClusterDependents(clusterName, currentColo, skipDryRun));
+        }
+
+        return consolidateResult(results, APIResult.class);
+    }
+
+    /**
      * Force updates the entity.
      * @param type Valid options are feed or process.
      * @param entityName Name of the entity.

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 9b1ff2a..56dcf87 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -195,7 +195,6 @@ public class TestFalconUnit extends FalconUnitTestBase {
         Process process = getEntity(EntityType.PROCESS, PROCESS_NAME);
         setDummyProperty(process);
         String processXml = process.toString();
-
         File file = new File("target/newprocess.xml");
         file.createNewFile();
         FileWriter fw = new FileWriter(file.getAbsoluteFile());
@@ -208,9 +207,8 @@ public class TestFalconUnit extends FalconUnitTestBase {
         result = falconUnitClient.touch(EntityType.PROCESS.name(), PROCESS_NAME, null, true, null);
         assertStatus(result);
 
-        process = getEntity(EntityType.PROCESS,
-                PROCESS_NAME);
-        Assert.assertEquals(process.toString(), processXml);
+        Process process2 = getEntity(EntityType.PROCESS, PROCESS_NAME);
+        Assert.assertEquals(process2.toString(), process.toString());
         file.delete();
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
index aa15dcc..7b32bd5 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
@@ -87,4 +87,20 @@ public class ConfigSyncService extends AbstractEntityManager {
             throw FalconWebException.newAPIException(throwable);
         }
     }
+
+    @POST
+    @Path("updateClusterDependents/{clusterName}")
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Monitored(event = "updateClusterDependents")
+    @Override
+    public APIResult updateClusterDependents(
+                            @Dimension("entityName") @PathParam("clusterName") final String clusterName,
+                            @Dimension("colo") @QueryParam("colo") String colo,
+                            @QueryParam("skipDryRun") final Boolean skipDryRun) {
+        try {
+            return super.updateClusterDependents(clusterName, colo, skipDryRun);
+        } catch (Throwable throwable) {
+            throw FalconWebException.newAPIException(throwable);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index e97adff..657ef9e 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -18,8 +18,14 @@
 
 package org.apache.falcon.resource;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.monitors.Dimension;
+import org.apache.falcon.monitors.Monitored;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -27,16 +33,10 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
-import javax.ws.rs.DELETE;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconWebException;
-import org.apache.falcon.monitors.Dimension;
-import org.apache.falcon.monitors.Monitored;
-
 /**
  * Entity management operations as REST API for feed and process.
  */
@@ -79,6 +79,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
         throw FalconWebException.newAPIException("delete on server is not"
                 + " supported.Please run your operation on Prism.", Response.Status.FORBIDDEN);
     }
+
     /**
      * Updates the submitted entity.
      * @param request Servlet Request
@@ -103,6 +104,26 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
     }
 
     /**
+     * Updates the dependent entities of a cluster in workflow engine.
+     * @param clusterName Name of cluster.
+     * @param ignore colo.
+     * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true.
+     * @return Result of the validation.
+     */
+    @POST
+    @Path("updateClusterDependents/{clusterName}")
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    @Monitored(event = "updateClusterDependents")
+    @Override
+    public APIResult updateClusterDependents(
+            @Dimension("entityName") @PathParam("clusterName") final String clusterName,
+            @Dimension("colo") @QueryParam("colo") String ignore,
+            @QueryParam("skipDryRun") final Boolean skipDryRun) {
+        throw FalconWebException.newAPIException("update on server is not"
+                + " supported.Please run your operation on Prism.", Response.Status.FORBIDDEN);
+    }
+
+    /**
      * Submits and schedules an entity.
      * @param request Servlet Request
      * @param type Valid options are feed or process.

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
new file mode 100644
index 0000000..f5efa37
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.falcon.resource.TestContext;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+
+/**
+ * Test for Falcon CLI.
+ */
+@Test(groups = {"exhaustive"})
+public class FalconClusterUpdateCLIIT {
+    private InMemoryWriter stream = new InMemoryWriter(System.out);
+    private TestContext context = new TestContext();
+    private Map<String, String> overlay;
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        context.prepare();
+        FalconCLI.OUT.set(stream);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        clearSafemode();
+        context.deleteEntitiesFromStore();
+    }
+
+
+    public void testUpdateClusterCommands() throws Exception {
+
+        FalconCLI.OUT.set(stream);
+
+        String filePath;
+        overlay = context.getUniqueOverlay();
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+        context.setCluster(overlay.get("cluster"));
+        Assert.assertEquals(stream.buffer.toString().trim(),
+                "falcon/default/Submit successful (cluster) " + context.getClusterName());
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0);
+
+
+        // Update cluster here and test that it works
+
+        initSafemode();
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -update -type cluster -file "
+                + filePath + " -name " + overlay.get("cluster")), 0);
+        clearSafemode();
+
+        // Try to update dependent entities
+        Assert.assertEquals(executeWithURL("entity -updateClusterDependents -cluster "
+                + overlay.get("cluster") + " -skipDryRun "), 0);
+
+        // try to update cluster with wrong name, it should fail.
+        initSafemode();
+        overlay = context.getUniqueOverlay();
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -update -type cluster -file "
+                + filePath + " -name " + overlay.get("cluster")), -1);
+        clearSafemode();
+    }
+
+
+    private void initSafemode() throws Exception {
+        // Set safemode
+        Assert.assertEquals(new FalconCLI().run(("admin -setsafemode true -url "
+                + TestContext.BASE_URL).split("\\s")), 0);
+    }
+
+    private void clearSafemode() throws Exception {
+        Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url "
+                + TestContext.BASE_URL).split("\\s")), 0);
+    }
+
+    private int executeWithURL(String command) throws Exception {
+        FalconCLI.OUT.get().print("COMMAND IS "+command + " -url " + TestContext.BASE_URL + "\n");
+        return new FalconCLI()
+                .run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
+    }
+
+    private static class InMemoryWriter extends PrintStream {
+
+        private StringBuffer buffer = new StringBuffer();
+
+        public InMemoryWriter(OutputStream out) {
+            super(out);
+        }
+
+        @Override
+        public void println(String x) {
+            clear();
+            buffer.append(x);
+            super.println(x);
+        }
+
+        @SuppressWarnings("UnusedDeclaration")
+        public String getBuffer() {
+            return buffer.toString();
+        }
+
+        public void clear() {
+            buffer.delete(0, buffer.length());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
index f640a69..d2b62b2 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
@@ -21,7 +21,6 @@ package org.apache.falcon.cli;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.util.FalconTestUtil;
-import org.apache.falcon.util.StartupProperties;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -84,8 +83,6 @@ public class FalconSafemodeCLIIT {
     private void clearSafemode() throws Exception {
         Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url "
                 + TestContext.BASE_URL).split("\\s")), 0);
-        Assert.assertEquals(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"),
-                "false");
     }
 
     public void testEntityCommandsNotAllowedInSafeMode() throws Exception {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index f84559f..3cf5c18 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -95,6 +95,7 @@ public class TestContext extends AbstractTestContext {
     public static final String DATASOURCE_TEMPLATE3 = "/datasource-template3.xml";
     public static final String DATASOURCE_TEMPLATE4 = "/datasource-template4.xml";
     public static final String CLUSTER_TEMPLATE = "/cluster-template.xml";
+    public static final String CLUSTER_UPDATED_TEMPLATE = "/cluster-updated-template.xml";
     public static final String PIG_PROCESS_TEMPLATE = "/pig-process-template.xml";
 
     public static final String BASE_URL = "https://localhost:41443/falcon-webapp";

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/resources/cluster-updated-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/cluster-updated-template.xml b/webapp/src/test/resources/cluster-updated-template.xml
new file mode 100644
index 0000000..f94e897
--- /dev/null
+++ b/webapp/src/test/resources/cluster-updated-template.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<cluster colo="##colo##" description="updated cluster" name="##cluster##" xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="jail://global:00"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="jail://global:00"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:41021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:41000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.4.3"/>
+        <interface type="registry" endpoint="thrift://localhost:49083"
+                   version="0.11.0"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="test1" value="value1"/>
+    </properties>
+</cluster>


Mime
View raw message