atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [2/2] incubator-atlas git commit: ATLAS-835 Falcon Integration with Atlas (sowmyaramesh via shwethags)
Date Mon, 20 Jun 2016 06:54:45 GMT
ATLAS-835 Falcon Integration with Atlas (sowmyaramesh via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/e30ab3d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/e30ab3d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/e30ab3d8

Branch: refs/heads/master
Commit: e30ab3d8d78cfe3dae70babff9d3c6bcf9065f20
Parents: 436a524
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Mon Jun 20 12:21:55 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Mon Jun 20 12:21:55 2016 +0530

----------------------------------------------------------------------
 .../org/apache/atlas/falcon/Util/EventUtil.java |  64 +++
 .../atlas/falcon/bridge/FalconBridge.java       | 401 +++++++++++++++++++
 .../apache/atlas/falcon/event/FalconEvent.java  |  72 ++++
 .../apache/atlas/falcon/hook/FalconHook.java    | 246 ++++--------
 .../falcon/model/FalconDataModelGenerator.java  | 142 +++++--
 .../atlas/falcon/model/FalconDataTypes.java     |  18 +-
 .../falcon/publisher/FalconEventPublisher.java  |  41 ++
 .../atlas/falcon/service/AtlasService.java      | 141 +++++++
 .../org/apache/falcon/atlas/Util/EventUtil.java |  68 ----
 .../apache/falcon/atlas/event/FalconEvent.java  |  65 ---
 .../atlas/publisher/FalconEventPublisher.java   |  41 --
 .../falcon/atlas/service/AtlasService.java      | 115 ------
 .../apache/atlas/falcon/hook/FalconHookIT.java  | 242 +++++++----
 .../src/test/resources/feed-replication.xml     |  43 ++
 .../org/apache/atlas/hive/hook/HiveHook.java    |  51 +--
 .../org/apache/atlas/hive/hook/HiveHookIT.java  |   3 -
 distro/src/conf/atlas-log4j.xml                 |  12 +-
 docs/src/site/twiki/Bridge-Falcon.twiki         |  17 +-
 .../java/org/apache/atlas/hook/AtlasHook.java   |   5 +-
 .../org/apache/atlas/hook/AtlasHookTest.java    |  18 +-
 release-log.txt                                 |   1 +
 .../main/resources/atlas-application.properties |   4 +
 typesystem/src/main/resources/atlas-log4j.xml   |   4 +-
 .../apache/atlas/web/filters/AuditFilter.java   |   4 +-
 24 files changed, 1191 insertions(+), 627 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
new file mode 100644
index 0000000..c1ccd05
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.Util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Falcon event util
+ */
+public final class EventUtil {
+
+    private EventUtil() {}
+
+
+    public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) {
+        if (StringUtils.isBlank(keyValueString)) {
+            return null;
+        }
+
+        Map<String, String> keyValueMap = new HashMap<>();
+
+        String[] tags = keyValueString.split(",");
+        for (String tag : tags) {
+            int index = tag.indexOf("=");
+            String tagKey = tag.substring(0, index);
+            String tagValue = tag.substring(index + 1, tag.length());
+            keyValueMap.put(tagKey, tagValue);
+        }
+        return keyValueMap;
+    }
+
+    public static UserGroupInformation getUgi() throws FalconException {
+        UserGroupInformation ugi;
+        try {
+            ugi = CurrentUser.getAuthenticatedUGI();
+        } catch (IOException ioe) {
+            throw new FalconException(ioe);
+        }
+        return ugi;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
new file mode 100644
index 0000000..1621d95
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.bridge;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.falcon.model.FalconDataModelGenerator;
+import org.apache.atlas.falcon.model.FalconDataTypes;
+import org.apache.atlas.fs.model.FSDataTypes;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.atlas.falcon.Util.EventUtil;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.FileSystemStorage;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Bridge Utility to register Falcon entities metadata to Atlas.
+ */
+public class FalconBridge {
+    private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class);
+
+    /**
+     * Creates cluster entity
+     *
+     * @param cluster ClusterEntity
+     * @return cluster instance reference
+     */
+    public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster,
+                                                    final String user,
+                                                    final Date timestamp) throws Exception {
+        LOG.info("Creating cluster Entity : {}", cluster.getName());
+
+        Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
+
+        clusterRef.set(FalconDataModelGenerator.NAME, cluster.getName());
+        clusterRef.set("description", cluster.getDescription());
+        clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName());
+
+        clusterRef.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
+        clusterRef.set(FalconDataModelGenerator.COLO, cluster.getColo());
+
+        clusterRef.set(FalconDataModelGenerator.USER, user);
+
+        if (StringUtils.isNotEmpty(cluster.getTags())) {
+            clusterRef.set(FalconDataModelGenerator.TAGS,
+                    EventUtil.convertKeyValueStringToMap(cluster.getTags()));
+        }
+
+        return clusterRef;
+    }
+
+    private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable,
+                                                  String user, Date timestamp) throws Exception {
+        LOG.info("Creating feed dataset: {}", feed.getName());
+
+        Referenceable datasetReferenceable = new Referenceable(FalconDataTypes.FALCON_FEED.getName());
+        datasetReferenceable.set(FalconDataModelGenerator.NAME, feed.getName());
+        String feedQualifiedName =
+                getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(FalconDataModelGenerator.NAME));
+        datasetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName);
+        datasetReferenceable.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
+
+        datasetReferenceable.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable);
+        datasetReferenceable.set(FalconDataModelGenerator.USER, user);
+
+        if (StringUtils.isNotEmpty(feed.getTags())) {
+            datasetReferenceable.set(FalconDataModelGenerator.TAGS,
+                    EventUtil.convertKeyValueStringToMap(feed.getTags()));
+        }
+
+        if (feed.getGroups() != null) {
+            datasetReferenceable.set(FalconDataModelGenerator.GROUPS, feed.getGroups());
+        }
+
+        return datasetReferenceable;
+    }
+
+    public static List<Referenceable> createFeedCreationEntity(Feed feed, ConfigurationStore falconStore, String user,
+                                                               Date timestamp) throws Exception {
+        LOG.info("Creating feed : {}", feed.getName());
+
+        List<Referenceable> entities = new ArrayList<>();
+
+        if (feed.getClusters() != null) {
+            List<Referenceable> replicationInputs = new ArrayList<>();
+            List<Referenceable> replicationOutputs = new ArrayList<>();
+
+            for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
+                org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER,
+                        feedCluster.getName());
+
+                // set cluster
+                Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo());
+                entities.add(clusterReferenceable);
+
+                // input as hive_table or hdfs_path, output as falcon_feed dataset
+                List<Referenceable> inputs = new ArrayList<>();
+                List<Referenceable> inputReferenceables = getInputEntities(cluster, feed);
+                if (inputReferenceables != null) {
+                    entities.addAll(inputReferenceables);
+                    inputs.add(inputReferenceables.get(inputReferenceables.size() - 1));
+                }
+
+                List<Referenceable> outputs = new ArrayList<>();
+                Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable, user, timestamp);
+                if (feedEntity != null) {
+                    entities.add(feedEntity);
+                    outputs.add(feedEntity);
+                }
+
+                if (!inputs.isEmpty() || !outputs.isEmpty()) {
+                    Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName());
+                    String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName());
+
+                    feedCreateEntity.set(FalconDataModelGenerator.NAME, feed.getName());
+                    feedCreateEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName);
+
+                    feedCreateEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
+                    if (!inputs.isEmpty()) {
+                        feedCreateEntity.set(FalconDataModelGenerator.INPUTS, inputs);
+                    }
+                    if (!outputs.isEmpty()) {
+                        feedCreateEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
+                    }
+
+                    feedCreateEntity.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable);
+                    feedCreateEntity.set(FalconDataModelGenerator.USER, user);
+                    entities.add(feedCreateEntity);
+                }
+
+                if (ClusterType.SOURCE == feedCluster.getType()) {
+                    replicationInputs.add(feedEntity);
+                } else if (ClusterType.TARGET == feedCluster.getType()) {
+                    replicationOutputs.add(feedEntity);
+                }
+            }
+
+            if (!replicationInputs.isEmpty() && !replicationInputs.isEmpty()) {
+                Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes
+                        .FALCON_FEED_REPLICATION.getName());
+
+                feedReplicationEntity.set(FalconDataModelGenerator.NAME, feed.getName());
+                feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName());
+
+                feedReplicationEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
+                feedReplicationEntity.set(FalconDataModelGenerator.INPUTS, replicationInputs);
+                feedReplicationEntity.set(FalconDataModelGenerator.OUTPUTS, replicationOutputs);
+                feedReplicationEntity.set(FalconDataModelGenerator.USER, user);
+                entities.add(feedReplicationEntity);
+            }
+
+        }
+        return entities;
+    }
+
+    /**
+     * +     * Creates process entity
+     * +     *
+     * +     * @param process process entity
+     * +     * @param falconStore config store
+     * +     * @param user falcon user
+     * +     * @param timestamp timestamp of entity
+     * +     * @return process instance reference
+     * +
+     */
+    public static List<Referenceable> createProcessEntity(org.apache.falcon.entity.v0.process.Process process,
+                                                          ConfigurationStore falconStore, String user,
+                                                          Date timestamp) throws Exception {
+        LOG.info("Creating process Entity : {}", process.getName());
+
+        // The requirement is for each cluster, create a process entity with name
+        // clustername.processname
+        List<Referenceable> entities = new ArrayList<>();
+
+        if (process.getClusters() != null) {
+
+            for (Cluster processCluster : process.getClusters().getClusters()) {
+                org.apache.falcon.entity.v0.cluster.Cluster cluster =
+                        falconStore.get(EntityType.CLUSTER, processCluster.getName());
+                Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo());
+                entities.add(clusterReferenceable);
+
+                List<Referenceable> inputs = new ArrayList<>();
+                if (process.getInputs() != null) {
+                    for (Input input : process.getInputs().getInputs()) {
+                        Referenceable inputReferenceable = getFeedDataSetReference(getFeedQualifiedName(input.getFeed(),
+                                cluster.getName()), clusterReferenceable);
+                        entities.add(inputReferenceable);
+                        inputs.add(inputReferenceable);
+                    }
+                }
+
+                List<Referenceable> outputs = new ArrayList<>();
+                if (process.getOutputs() != null) {
+                    for (Output output : process.getOutputs().getOutputs()) {
+                        Referenceable outputReferenceable = getFeedDataSetReference(getFeedQualifiedName(output.getFeed(),
+                                cluster.getName()), clusterReferenceable);
+                        entities.add(outputReferenceable);
+                        outputs.add(outputReferenceable);
+                    }
+                }
+
+                if (!inputs.isEmpty() || !outputs.isEmpty()) {
+
+                    Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS.getName());
+                    processEntity.set(FalconDataModelGenerator.NAME, process.getName());
+                    processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                            getProcessQualifiedName(process.getName(), cluster.getName()));
+                    processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
+
+                    if (!inputs.isEmpty()) {
+                        processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
+                    }
+                    if (!outputs.isEmpty()) {
+                        processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
+                    }
+
+                    // set cluster
+                    processEntity.set(FalconDataModelGenerator.RUNSON, clusterReferenceable);
+
+                    // Set user
+                    processEntity.set(FalconDataModelGenerator.USER, user);
+
+                    if (StringUtils.isNotEmpty(process.getTags())) {
+                        processEntity.set(FalconDataModelGenerator.TAGS,
+                                EventUtil.convertKeyValueStringToMap(process.getTags()));
+                    }
+
+                    if (process.getPipelines() != null) {
+                        processEntity.set(FalconDataModelGenerator.PIPELINES, process.getPipelines());
+                    }
+
+                    processEntity.set(FalconDataModelGenerator.WFPROPERTIES,
+                            getProcessEntityWFProperties(process.getWorkflow(),
+                                    process.getName()));
+
+                    entities.add(processEntity);
+                }
+
+            }
+        }
+        return entities;
+    }
+
+    private static List<Referenceable> getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+                                                        Feed feed) throws Exception {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+
+        final CatalogTable table = getTable(feedCluster, feed);
+        if (table != null) {
+            CatalogStorage storage = new CatalogStorage(cluster, table);
+            return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(),
+                    storage.getTable().toLowerCase());
+        } else {
+            List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
+            Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
+            final String pathUri = normalize(dataLocation.getPath());
+            LOG.info("Registering DFS Path {} ", pathUri);
+            return fillHDFSDataSet(pathUri, cluster.getName());
+        }
+    }
+
+    private static CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
+        // check if table is overridden in cluster
+        if (cluster.getTable() != null) {
+            return cluster.getTable();
+        }
+
+        return feed.getTable();
+    }
+
+    private static List<Referenceable> fillHDFSDataSet(final String pathUri, final String clusterName) {
+        List<Referenceable> entities = new ArrayList<>();
+        Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString());
+        ref.set("path", pathUri);
+        //        Path path = new Path(pathUri);
+        //        ref.set("name", path.getName());
+        //TODO - Fix after ATLAS-542 to shorter Name
+        ref.set(FalconDataModelGenerator.NAME, pathUri);
+        ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
+        ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+        entities.add(ref);
+        return entities;
+    }
+
+    private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
+            throws Exception {
+        Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
+        dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+        dbRef.set(HiveDataModelGenerator.NAME, dbName);
+        dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
+        return dbRef;
+    }
+
+    private static List<Referenceable> createHiveTableInstance(String clusterName, String dbName,
+                                                               String tableName) throws Exception {
+        List<Referenceable> entities = new ArrayList<>();
+        Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
+        entities.add(dbRef);
+
+        Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
+        tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
+        tableRef.set(HiveDataModelGenerator.NAME, tableName.toLowerCase());
+        tableRef.set(HiveDataModelGenerator.DB, dbRef);
+        entities.add(tableRef);
+
+        return entities;
+    }
+
+    private static Referenceable getClusterEntityReference(final String clusterName,
+                                                           final String colo) {
+        LOG.info("Getting reference for entity {}", clusterName);
+        Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
+        clusterRef.set(FalconDataModelGenerator.NAME, String.format("%s", clusterName));
+        clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName);
+        clusterRef.set(FalconDataModelGenerator.COLO, colo);
+        return clusterRef;
+    }
+
+
+    private static Referenceable getFeedDataSetReference(final String feedDatasetName,
+                                                         Referenceable clusterReference) {
+        LOG.info("Getting reference for entity {}", feedDatasetName);
+        Referenceable feedDatasetRef = new Referenceable(FalconDataTypes.FALCON_FEED.getName());
+        feedDatasetRef.set(FalconDataModelGenerator.NAME, feedDatasetName);
+        feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedDatasetName);
+        feedDatasetRef.set(FalconDataModelGenerator.STOREDIN, clusterReference);
+        return feedDatasetRef;
+    }
+
+    private static Map<String, String> getProcessEntityWFProperties(final Workflow workflow,
+                                                                    final String processName) {
+        Map<String, String> wfProperties = new HashMap<>();
+        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
+                ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
+        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
+                workflow.getVersion());
+        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
+                workflow.getEngine().value());
+
+        return wfProperties;
+    }
+
+    public static String getFeedQualifiedName(final String feedName, final String clusterName) {
+        return String.format("%s@%s", feedName, clusterName);
+    }
+
+    public static String getProcessQualifiedName(final String processName, final String clusterName) {
+        return String.format("%s@%s", processName, clusterName);
+    }
+
+    public static String normalize(final String str) {
+        if (StringUtils.isBlank(str)) {
+            return null;
+        }
+        return str.toLowerCase().trim();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
new file mode 100644
index 0000000..37df6da
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.event;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.Date;
+
+/**
+ * Falcon event to interface with Atlas Service.
+ */
+public class FalconEvent {
+    protected String user;
+    protected UserGroupInformation ugi;
+    protected OPERATION operation;
+    protected long timestamp;
+    protected Entity entity;
+
+    public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) {
+        this.user = doAsUser;
+        this.ugi = ugi;
+        this.operation = falconOperation;
+        this.timestamp = timestamp;
+        this.entity = entity;
+    }
+
+    public enum OPERATION {
+        ADD_CLUSTER,
+        UPDATE_CLUSTER,
+        ADD_FEED,
+        UPDATE_FEED,
+        ADD_PROCESS,
+        UPDATE_PROCESS,
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public UserGroupInformation getUgi() {
+        return ugi;
+    }
+
+    public OPERATION getOperation() {
+        return operation;
+    }
+
+    public Date getTimestamp() {
+        return new Date(timestamp);
+    }
+
+    public Entity getEntity() {
+        return entity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index f27a8b0..95f255e 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,32 +21,17 @@ package org.apache.atlas.falcon.hook;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.falcon.model.FalconDataModelGenerator;
-import org.apache.atlas.falcon.model.FalconDataTypes;
-import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
-import org.apache.atlas.hive.model.HiveDataModelGenerator;
-import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.falcon.bridge.FalconBridge;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.atlas.Util.EventUtil;
-import org.apache.falcon.atlas.event.FalconEvent;
-import org.apache.falcon.atlas.publisher.FalconEventPublisher;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.FeedHelper;
+import org.apache.atlas.falcon.event.FalconEvent;
+import org.apache.atlas.falcon.publisher.FalconEventPublisher;
 import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.security.CurrentUser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,6 +71,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
 
     private static ConfigurationStore STORE;
 
+    private enum Operation {
+        ADD,
+        UPDATE
+    }
+
     static {
         try {
             // initialize the async facility to process hook calls. We don't
@@ -115,12 +105,14 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
             });
 
             STORE = ConfigurationStore.get();
+
+            Injector injector = Guice.createInjector(new NotificationModule());
+            notifInterface = injector.getInstance(NotificationInterface.class);
+
         } catch (Exception e) {
-            LOG.info("Caught exception initializing the falcon hook.", e);
+            LOG.error("Caught exception initializing the falcon hook.", e);
         }
 
-        Injector injector = Guice.createInjector(new NotificationModule());
-        notifInterface = injector.getInstance(NotificationInterface.class);
 
         LOG.info("Created Atlas Hook for Falcon");
     }
@@ -128,166 +120,92 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
     @Override
     public void publish(final Data data) throws Exception {
         final FalconEvent event = data.getEvent();
-        if (sync) {
-            fireAndForget(event);
-        } else {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        fireAndForget(event);
-                    } catch (Throwable e) {
-                        LOG.info("Atlas hook failed", e);
+        try {
+            if (sync) {
+                fireAndForget(event);
+            } else {
+                executor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            fireAndForget(event);
+                        } catch (Throwable e) {
+                            LOG.info("Atlas hook failed", e);
+                        }
                     }
-                }
-            });
+                });
+            }
+        } catch (Throwable t) {
+            LOG.warn("Error in processing data {}", data);
         }
     }
 
+    @Override
+    protected String getNumberOfRetriesPropertyKey() {
+        return HOOK_NUM_RETRIES;
+    }
+
     private void fireAndForget(FalconEvent event) throws Exception {
         LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
+        List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
 
-        notifyEntities(getAuthenticatedUser(), createEntities(event));
-    }
+        Operation op = getOperation(event.getOperation());
+        String user = getUser(event.getUser());
+        LOG.info("fireAndForget user:{}, ugi: {}", user, event.getUgi());
+        switch (op) {
+        case ADD:
+            messages.add(new HookNotification.EntityCreateRequest(user, createEntities(event, user)));
+            break;
 
-    private String getAuthenticatedUser() {
-        String user = null;
-        try {
-            user = CurrentUser.getAuthenticatedUser();
-        } catch (IllegalArgumentException e) {
-            LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser");
         }
-        return getUser(user, null);
+        notifyEntities(messages);
     }
 
-    private List<Referenceable> createEntities(FalconEvent event) throws Exception {
-        switch (event.getOperation()) {
-            case ADD_PROCESS:
-                return createProcessInstance((Process) event.getEntity(), event.getUser(), event.getTimestamp());
-        }
-
-        return null;
-    }
-
-    /**
-     +     * Creates process entity
-     +     *
-     +     * @param event process entity event
-     +     * @return process instance reference
-     +     */
-    public List<Referenceable> createProcessInstance(Process process, String user, long timestamp) throws Exception {
-        LOG.info("Creating process Instance : {}", process.getName());
-
-        // The requirement is for each cluster, create a process entity with name
-        // clustername.processname
+    private List<Referenceable> createEntities(FalconEvent event, String user) throws Exception {
         List<Referenceable> entities = new ArrayList<>();
 
-        if (process.getClusters() != null) {
-
-            for (Cluster processCluster : process.getClusters().getClusters()) {
-                org.apache.falcon.entity.v0.cluster.Cluster cluster = STORE.get(EntityType.CLUSTER, processCluster.getName());
-
-                List<Referenceable> inputs = new ArrayList<>();
-                if (process.getInputs() != null) {
-                    for (Input input : process.getInputs().getInputs()) {
-                        List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed());
-                        if (clusterInputs != null) {
-                            entities.addAll(clusterInputs);
-                            inputs.add(clusterInputs.get(clusterInputs.size() - 1));
-                        }
-                    }
-                }
-
-                List<Referenceable> outputs = new ArrayList<>();
-                if (process.getOutputs() != null) {
-                    for (Output output : process.getOutputs().getOutputs()) {
-                        List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed());
-                        if (clusterOutputs != null) {
-                            entities.addAll(clusterOutputs);
-                            outputs.add(clusterOutputs.get(clusterOutputs.size() - 1));
-                        }
-                    }
-                }
-
-                if (!inputs.isEmpty() || !outputs.isEmpty()) {
-                    Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
-                    processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(),
-                            cluster.getName()));
-                    processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(),
-                        cluster.getName()));
-                    processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
-                    if (!inputs.isEmpty()) {
-                        processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
-                    }
-                    if (!outputs.isEmpty()) {
-                        processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
-                    }
-                    processEntity.set(FalconDataModelGenerator.USER, user);
-
-                    if (StringUtils.isNotEmpty(process.getTags())) {
-                        processEntity.set(FalconDataModelGenerator.TAGS,
-                                EventUtil.convertKeyValueStringToMap(process.getTags()));
-                    }
-                    entities.add(processEntity);
-                }
-
-            }
+        switch (event.getOperation()) {
+        case ADD_CLUSTER:
+            entities.add(FalconBridge
+                    .createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity(), user,
+                            event.getTimestamp()));
+            break;
+
+        case ADD_PROCESS:
+            entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE,
+                    user, event.getTimestamp()));
+            break;
+
+        case ADD_FEED:
+            entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE,
+                    user, event.getTimestamp()));
+            break;
+
+        case UPDATE_CLUSTER:
+        case UPDATE_FEED:
+        case UPDATE_PROCESS:
+        default:
+            LOG.info("Falcon operation {} is not valid or supported", event.getOperation());
         }
 
         return entities;
     }
 
-    private List<Referenceable> getInputOutputEntity(org.apache.falcon.entity.v0.cluster.Cluster cluster, String feedName) throws Exception {
-        Feed feed = STORE.get(EntityType.FEED, feedName);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+    private static Operation getOperation(final FalconEvent.OPERATION op) throws Exception {
+        switch (op) {
+        case ADD_CLUSTER:
+        case ADD_FEED:
+        case ADD_PROCESS:
+            return Operation.ADD;
 
-        final CatalogTable table = getTable(feedCluster, feed);
-        if (table != null) {
-            CatalogStorage storage = new CatalogStorage(cluster, table);
-            return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(),
-                    storage.getTable().toLowerCase());
-        }
-
-        return null;
-    }
+        case UPDATE_CLUSTER:
+        case UPDATE_FEED:
+        case UPDATE_PROCESS:
+            return Operation.UPDATE;
 
-    private CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
-        // check if table is overridden in cluster
-        if (cluster.getTable() != null) {
-            return cluster.getTable();
+        default:
+            throw new Exception("Falcon operation " + op + " is not valid or supported");
         }
-
-        return feed.getTable();
-    }
-
-    private Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
-            throws Exception {
-        Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
-        dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
-        dbRef.set(HiveDataModelGenerator.NAME, dbName);
-        dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
-        return dbRef;
-    }
-
-    private List<Referenceable> createHiveTableInstance(String clusterName, String dbName, String tableName) throws Exception {
-        List<Referenceable> entities = new ArrayList<>();
-        Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
-        entities.add(dbRef);
-
-        Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
-        tableRef.set(HiveDataModelGenerator.NAME,
-                tableName);
-        tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
-        tableRef.set(HiveDataModelGenerator.DB, dbRef);
-        entities.add(tableRef);
-
-        return entities;
-    }
-
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
index 397dea4..81cd5e0 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,7 +20,6 @@ package org.apache.atlas.falcon.model;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.addons.ModelDefinitionDump;
@@ -53,48 +52,46 @@ public class FalconDataModelGenerator {
     private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class);
 
     private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions;
-    private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap;
-    private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
 
     public static final String NAME = "name";
     public static final String TIMESTAMP = "timestamp";
-    public static final String USER = "owned-by";
-    public static final String TAGS = "tag-classification";
+    public static final String COLO = "colo";
+    public static final String USER = "owner";
+    public static final String TAGS = "tags";
+    public static final String GROUPS = "groups";
+    public static final String PIPELINES = "pipelines";
+    public static final String WFPROPERTIES = "workflow-properties";
+    public static final String RUNSON = "runs-on";
+    public static final String STOREDIN = "stored-in";
 
     // multiple inputs and outputs for process
     public static final String INPUTS = "inputs";
     public static final String OUTPUTS = "outputs";
 
-
     public FalconDataModelGenerator() {
         classTypeDefinitions = new HashMap<>();
-        enumTypeDefinitionMap = new HashMap<>();
-        structTypeDefinitionMap = new HashMap<>();
     }
 
     public void createDataModel() throws AtlasException {
         LOG.info("Generating the Falcon Data Model");
-        createProcessEntityClass();
 
+        // classes
+        createClusterEntityClass();
+        createProcessEntityClass();
+        createFeedEntityClass();
+        createFeedDatasetClass();
+        createReplicationFeedEntityClass();
     }
 
     private TypesDef getTypesDef() {
-        return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(),
-                getClassTypeDefinitions());
+        return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
+                getTraitTypeDefinitions(), getClassTypeDefinitions());
     }
 
     public String getDataModelAsJSON() {
         return TypesSerialization.toJson(getTypesDef());
     }
 
-    private ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() {
-        return ImmutableList.copyOf(enumTypeDefinitionMap.values());
-    }
-
-    private ImmutableList<StructTypeDefinition> getStructTypeDefinitions() {
-        return ImmutableList.copyOf(structTypeDefinitionMap.values());
-    }
-
     private ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() {
         return ImmutableList.copyOf(classTypeDefinitions.values());
     }
@@ -103,24 +100,103 @@ public class FalconDataModelGenerator {
         return ImmutableList.of();
     }
 
+    private void createClusterEntityClass() throws AtlasException {
+        AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+                new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
+                        null),
+                new AttributeDefinition(COLO, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
+                        null),
+                new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
+                        null),
+                // map of tags
+                new AttributeDefinition(TAGS,
+                        DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
+                        Multiplicity.OPTIONAL, false, null),};
 
-    private void createProcessEntityClass() throws AtlasException {
+        HierarchicalTypeDefinition<ClassType> definition =
+                new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_CLUSTER.getName(), null,
+                        ImmutableSet.of(AtlasClient.INFRASTRUCTURE_SUPER_TYPE), attributeDefinitions);
+        classTypeDefinitions.put(FalconDataTypes.FALCON_CLUSTER.getName(), definition);
+        LOG.debug("Created definition for {}", FalconDataTypes.FALCON_CLUSTER.getName());
+    }
+
+    private void createFeedEntityClass() throws AtlasException {
         AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
-                new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false,
+                new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
                         null),
+                new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
+                        false, null),
                 new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
+                        null)};
+
+        HierarchicalTypeDefinition<ClassType> definition =
+                new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_CREATION.getName(), null,
+                        ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
+        classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_CREATION.getName(), definition);
+        LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_CREATION.getName());
+    }
+
+    private void createFeedDatasetClass() throws AtlasException {
+        AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+                new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
+                        null),
+                new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
+                        false, null),
+                new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
                         null),
+                new AttributeDefinition(GROUPS, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
                 // map of tags
-                new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
+                new AttributeDefinition(TAGS,
+                        DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
                         Multiplicity.OPTIONAL, false, null),};
 
         HierarchicalTypeDefinition<ClassType> definition =
-                new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), null,
-                    ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
-        classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), definition);
-        LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
+                new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED.getName(), null,
+                        ImmutableSet.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions);
+        classTypeDefinitions.put(FalconDataTypes.FALCON_FEED.getName(), definition);
+        LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED.getName());
+    }
+
+
+    private void createReplicationFeedEntityClass() throws AtlasException {
+        AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+                new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
+                        null),
+                new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
+                        null)};
+
+        HierarchicalTypeDefinition<ClassType> definition =
+                new HierarchicalTypeDefinition<>(ClassType.class,
+                        FalconDataTypes.FALCON_FEED_REPLICATION.getName(), null,
+                        ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
+        classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), definition);
+        LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_REPLICATION.getName());
     }
 
+    private void createProcessEntityClass() throws AtlasException {
+        AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+                new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
+                        null),
+                new AttributeDefinition(RUNSON, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
+                        false, null),
+                new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
+                        null),
+                // map of tags
+                new AttributeDefinition(TAGS,
+                        DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
+                        Multiplicity.OPTIONAL, false, null),
+                new AttributeDefinition(PIPELINES, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
+                // wf properties
+                new AttributeDefinition(WFPROPERTIES,
+                        DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
+                        Multiplicity.OPTIONAL, false, null),};
+
+        HierarchicalTypeDefinition<ClassType> definition =
+                new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS.getName(), null,
+                        ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
+        classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS.getName(), definition);
+        LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS.getName());
+    }
 
 
     public String getModelAsJson() throws AtlasException {
@@ -145,11 +221,13 @@ public class FalconDataModelGenerator {
                     Arrays.toString(enumType.enumValues)));
         }
         for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) {
-            System.out.println(String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(),
-                    Arrays.toString(structType.attributeDefinitions)));
+            System.out.println(
+                    String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(),
+                            Arrays.toString(structType.attributeDefinitions)));
         }
         for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) {
-            System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, ClassType.class.getSimpleName(),
+            System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName,
+                    ClassType.class.getSimpleName(),
                     StringUtils.join(classType.superTypes, ","), Arrays.toString(classType.attributeDefinitions)));
         }
         for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
index f1f350b..e36ff23 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
@@ -22,19 +22,15 @@ package org.apache.atlas.falcon.model;
  * Falcon Data Types for model and bridge.
  */
 public enum FalconDataTypes {
-
-
-    FALCON_PROCESS_ENTITY("falcon_process"),
-    ;
-
-    private final String name;
-
-    FalconDataTypes(java.lang.String name) {
-        this.name = name;
-    }
+    // Classes
+    FALCON_CLUSTER,
+    FALCON_FEED_CREATION,
+    FALCON_FEED,
+    FALCON_FEED_REPLICATION,
+    FALCON_PROCESS;
 
     public String getName() {
-        return name;
+        return name().toLowerCase();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
new file mode 100644
index 0000000..ea81226
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.publisher;
+
+
+import org.apache.atlas.falcon.event.FalconEvent;
+
+/**
+ * Falcon publisher for Atlas
+ */
+public interface FalconEventPublisher {
+    class Data {
+        private FalconEvent event;
+
+        public Data(FalconEvent event) {
+            this.event = event;
+        }
+
+        public FalconEvent getEvent() {
+            return event;
+        }
+    }
+
+    void publish(final Data data) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
new file mode 100644
index 0000000..c92bd43
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.service;
+
+import org.apache.atlas.falcon.Util.EventUtil;
+import org.apache.atlas.falcon.event.FalconEvent;
+import org.apache.atlas.falcon.hook.FalconHook;
+import org.apache.atlas.falcon.publisher.FalconEventPublisher;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.service.ConfigurationChangeListener;
+import org.apache.falcon.service.FalconService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Atlas service to publish Falcon events
+ */
+public class AtlasService implements FalconService, ConfigurationChangeListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class);
+    private FalconEventPublisher publisher;
+
+    /**
+     * Constant for the service name.
+     */
+    public static final String SERVICE_NAME = AtlasService.class.getSimpleName();
+
+    @Override
+    public String getName() {
+        return SERVICE_NAME;
+    }
+
+    @Override
+    public void init() throws FalconException {
+        ConfigurationStore.get().registerListener(this);
+        publisher = new FalconHook();
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        ConfigurationStore.get().unregisterListener(this);
+    }
+
+    @Override
+    public void onAdd(Entity entity) throws FalconException {
+        try {
+            EntityType entityType = entity.getEntityType();
+            switch (entityType) {
+            case CLUSTER:
+                addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER);
+                break;
+
+            case PROCESS:
+                addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS);
+                break;
+
+            case FEED:
+                addEntity(entity, FalconEvent.OPERATION.ADD_FEED);
+                break;
+
+            default:
+                LOG.debug("Entity type not processed {}", entityType);
+            }
+        } catch(Throwable t) {
+            LOG.warn("Error handling entity {}", entity, t);
+        }
+    }
+
+    @Override
+    public void onRemove(Entity entity) throws FalconException {
+    }
+
+    @Override
+    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+        /**
+         * Skipping update for now - update uses full update currently and this might result in all attributes wiped for hive entities
+        EntityType entityType = newEntity.getEntityType();
+        switch (entityType) {
+        case CLUSTER:
+            addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER);
+            break;
+
+        case PROCESS:
+            addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
+            break;
+
+        case FEED:
+            FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ?
+                    FalconEvent.OPERATION.UPDATE_REPLICATION_FEED :
+                    FalconEvent.OPERATION.UPDATE_FEED;
+            addEntity(newEntity, operation);
+            break;
+
+        default:
+            LOG.debug("Entity type not processed {}", entityType);
+        }
+         **/
+    }
+
+    @Override
+    public void onReload(Entity entity) throws FalconException {
+        //Since there is no import script that can import existing falcon entities to atlas, adding on falcon service start
+        onAdd(entity);
+    }
+
+    private void addEntity(Entity entity, FalconEvent.OPERATION operation) throws FalconException {
+        LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName());
+
+        try {
+            String user = entity.getACL() != null ? entity.getACL().getOwner() :
+                    UserGroupInformation.getLoginUser().getShortUserName();
+            FalconEvent event =
+                    new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity);
+            FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
+            publisher.publish(data);
+        } catch (Exception ex) {
+            throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java
deleted file mode 100644
index 7f67407..0000000
--- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.atlas.Util;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Falcon event util
- */
-public final class EventUtil {
-    private static final Logger LOG = LoggerFactory.getLogger(EventUtil.class);
-
-    private EventUtil() {}
-
-
-    public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) {
-        if (StringUtils.isBlank(keyValueString)) {
-            return null;
-        }
-
-        Map<String, String> keyValueMap = new HashMap<>();
-
-        String[] tags = keyValueString.split(",");
-        for (String tag : tags) {
-            int index = tag.indexOf("=");
-            String tagKey = tag.substring(0, index);
-            String tagValue = tag.substring(index + 1, tag.length());
-            keyValueMap.put(tagKey, tagValue);
-        }
-        return keyValueMap;
-    }
-
-
-    public static UserGroupInformation getUgi() throws FalconException {
-        UserGroupInformation ugi;
-        try {
-            ugi = CurrentUser.getAuthenticatedUGI();
-        } catch (IOException ioe) {
-            throw new FalconException(ioe);
-        }
-        return ugi;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java
deleted file mode 100644
index e587e73..0000000
--- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.atlas.event;
-
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Falcon event to interface with Atlas Service.
- */
-public class FalconEvent {
-    protected String user;
-    protected UserGroupInformation ugi;
-    protected OPERATION operation;
-    protected long timestamp;
-    protected Entity entity;
-
-    public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) {
-        this.user = doAsUser;
-        this.ugi = ugi;
-        this.operation = falconOperation;
-        this.timestamp = timestamp;
-        this.entity = entity;
-    }
-
-    public enum OPERATION {
-        ADD_PROCESS, UPDATE_PROCESS
-    }
-
-    public String getUser() {
-        return user;
-    }
-
-    public UserGroupInformation getUgi() {
-        return ugi;
-    }
-
-    public OPERATION getOperation() {
-        return operation;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public Entity getEntity() {
-        return entity;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
deleted file mode 100644
index 8029be9..0000000
--- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.atlas.publisher;
-
-
-import org.apache.falcon.atlas.event.FalconEvent;
-
-/**
- * Falcon publisher for Atlas
- */
-public interface FalconEventPublisher {
-    class Data {
-        private FalconEvent event;
-
-        public Data(FalconEvent event) {
-            this.event = event;
-        }
-
-        public FalconEvent getEvent() {
-            return event;
-        }
-    }
-
-    void publish(final Data data) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e30ab3d8/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java
deleted file mode 100644
index 373846d..0000000
--- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.atlas.service;
-
-import org.apache.atlas.falcon.hook.FalconHook;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.atlas.Util.EventUtil;
-import org.apache.falcon.atlas.event.FalconEvent;
-import org.apache.falcon.atlas.publisher.FalconEventPublisher;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.apache.falcon.service.FalconService;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Atlas service to publish Falcon events
- */
-public class AtlasService implements FalconService, ConfigurationChangeListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class);
-    private FalconEventPublisher publisher;
-
-    /**
-     * Constant for the service name.
-     */
-    public static final String SERVICE_NAME = AtlasService.class.getSimpleName();
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    @Override
-    public void init() throws FalconException {
-        ConfigurationStore.get().registerListener(this);
-        publisher = new FalconHook();
-    }
-
-
-    @Override
-    public void destroy() throws FalconException {
-        ConfigurationStore.get().unregisterListener(this);
-    }
-
-    @Override
-    public void onAdd(Entity entity) throws FalconException {
-        EntityType entityType = entity.getEntityType();
-        switch (entityType) {
-            case PROCESS:
-                addProcessEntity((Process) entity, FalconEvent.OPERATION.ADD_PROCESS);
-                break;
-
-            default:
-                LOG.debug("Entity type not processed " + entityType);
-        }
-    }
-
-    @Override
-    public void onRemove(Entity entity) throws FalconException {
-    }
-
-    @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
-        EntityType entityType = newEntity.getEntityType();
-        switch (entityType) {
-            case PROCESS:
-                addProcessEntity((Process) newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
-                break;
-
-            default:
-                LOG.debug("Entity type not processed " + entityType);
-        }
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        //Since there is no import script that can import existing falcon entities to atlas, adding on falcon service start
-        onAdd(entity);
-    }
-
-    private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) throws FalconException {
-        LOG.info("Adding process entity to Atlas: {}", entity.getName());
-
-        try {
-            String user = entity.getACL() != null ? entity.getACL().getOwner() :
-                    UserGroupInformation.getLoginUser().getShortUserName();
-            FalconEvent event = new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity);
-            FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
-            publisher.publish(data);
-        } catch (Exception ex) {
-            throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex);
-        }
-    }
-}


Mime
View raw message