atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject incubator-atlas git commit: ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat, yhemanth via shwethags)
Date Mon, 18 Jan 2016 11:32:44 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master a46711c54 -> b77d7c7bc


ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b77d7c7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b77d7c7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b77d7c7b

Branch: refs/heads/master
Commit: b77d7c7bc7cb11a047808c95b051eb2fbd2c813c
Parents: a46711c
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Mon Jan 18 17:02:32 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Mon Jan 18 17:02:32 2016 +0530

----------------------------------------------------------------------
 .../org/apache/atlas/hive/hook/HiveHook.java    |  48 +--
 addons/storm-bridge/pom.xml                     |  30 +-
 .../apache/atlas/storm/hook/StormAtlasHook.java | 427 +++++++++++++++++++
 .../atlas/storm/hook/StormTopologyUtil.java     | 237 ++++++++++
 .../atlas/storm/hook/StormAtlasHookIT.java      | 111 +++++
 .../atlas/storm/hook/StormAtlasHookTest.java    |  68 +++
 .../apache/atlas/storm/hook/StormTestUtil.java  |  71 +++
 .../java/org/apache/atlas/AtlasConstants.java   |  26 ++
 .../src/main/assemblies/standalone-package.xml  |   6 +
 .../java/org/apache/atlas/hook/AtlasHook.java   | 128 ++++++
 .../atlas/kafka/KafkaNotificationProvider.java  |   3 +-
 release-log.txt                                 |   1 +
 12 files changed, 1114 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 37a3169..2cc37c0 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -20,15 +20,11 @@ package org.apache.atlas.hive.hook;
 
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.commons.configuration.Configuration;
@@ -63,7 +59,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * AtlasHook sends lineage information to the AtlasSever.
  */
-public class HiveHook implements ExecuteWithHookContext {
+public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
 
@@ -103,16 +99,13 @@ public class HiveHook implements ExecuteWithHookContext {
         public Long queryStartTime;
     }
 
-    @Inject
-    private static NotificationInterface notifInterface;
-
     private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
 
     private static final HiveConf hiveConf;
 
     static {
         try {
-            atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES);
+            atlasProperties = ApplicationProperties.get();
 
             // initialize the async facility to process hook calls. We don't
             // want to do this inline since it adds plenty of overhead for the query.
@@ -142,15 +135,17 @@ public class HiveHook implements ExecuteWithHookContext {
             LOG.info("Attempting to send msg while shutdown in progress.", e);
         }
 
-        Injector injector = Guice.createInjector(new NotificationModule());
-        notifInterface = injector.getInstance(NotificationInterface.class);
-
         hiveConf = new HiveConf();
 
         LOG.info("Created Atlas Hook");
     }
 
     @Override
+    protected String getNumberOfRetriesPropertyKey() {
+        return HOOK_NUM_RETRIES;
+    }
+
+    @Override
     public void run(final HookContext hookContext) throws Exception {
         // clone to avoid concurrent access
         final HiveEvent event = new HiveEvent();
@@ -233,7 +228,7 @@ public class HiveHook implements ExecuteWithHookContext {
         default:
         }
 
-        notifyAtlas();
+        notifyEntities(messages);
     }
 
     private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
@@ -324,31 +319,6 @@ public class HiveHook implements ExecuteWithHookContext {
         }
     }
 
-    /**
-     * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
-     * De-duping of entities is done on server side depending on the unique attribute on the
-     */
-    private void notifyAtlas() {
-        int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-
-        LOG.debug("Notifying atlas with messages {}", messages);
-        int numRetries = 0;
-        while (true) {
-            try {
-                notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
-                break;
-            } catch(Exception e) {
-                numRetries++;
-                if(numRetries < maxRetries) {
-                    LOG.debug("Failed to notify atlas. Retrying", e);
-                } else {
-                    LOG.error("Failed to notify atlas after {} retries. Quitting", maxRetries, e);
-                    break;
-                }
-            }
-        }
-    }
-
     private String normalize(String str) {
         if (StringUtils.isEmpty(str)) {
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index 5ec291c2..7b0d366 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -31,6 +31,7 @@
 
     <properties>
         <storm.version>0.10.0.2.3.99.0-195</storm.version>
+        <hive.version>1.2.1</hive.version>
     </properties>
 
     <dependencies>
@@ -66,6 +67,24 @@
             <artifactId>hive-bridge</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${hbase.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+
         <!-- apache storm core dependencies -->
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -269,7 +288,16 @@
                                     <artifactId>paranamer</artifactId>
                                     <version>${paranamer.version}</version>
                                 </artifactItem>
-
+                                <artifactItem>
+                                    <groupId>org.apache.hive</groupId>
+                                    <artifactId>hive-exec</artifactId>
+                                    <version>${hive.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.hbase</groupId>
+                                    <artifactId>hbase-common</artifactId>
+                                    <version>${hbase.version}</version>
+                                </artifactItem>
                             </artifactItems>
                         </configuration>
                     </execution>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
new file mode 100644
index 0000000..490d95e
--- /dev/null
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.storm.hook;
+
+import backtype.storm.ISubmitterHook;
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologyInfo;
+import backtype.storm.utils.Utils;
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.storm.model.StormDataModel;
+import org.apache.atlas.storm.model.StormDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.TypesSerialization;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.slf4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * StormAtlasHook sends storm topology metadata information to Atlas
+ * via a Kafka Broker for durability.
+ * <p/>
+ * This is based on the assumption that the same topology name is used
+ * for the various lifecycle stages.
+ */
+public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
+
+    public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class);
+
+    private static final String CONF_PREFIX = "atlas.hook.storm.";
+    private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
+    // will be used for owner if Storm topology does not contain the owner instance
+    // possible if Storm is running in unsecure mode.
+    public static final String ANONYMOUS_OWNER = "anonymous";
+
+    public static final String HBASE_NAMESPACE_DEFAULT = "default";
+
+    private static volatile boolean typesRegistered = false;
+
+    public StormAtlasHook() {
+        super();
+    }
+
+    StormAtlasHook(AtlasClient atlasClient) {
+        super(atlasClient);
+    }
+    @Override
+    protected String getNumberOfRetriesPropertyKey() {
+        return HOOK_NUM_RETRIES;
+    }
+
+    /**
+     * This is the client-side hook that storm fires when a topology is added.
+     *
+     * @param topologyInfo topology info
+     * @param stormConf configuration
+     * @param stormTopology a storm topology
+     * @throws IllegalAccessException
+     */
+    @Override
+    public void notify(TopologyInfo topologyInfo, Map stormConf,
+                       StormTopology stormTopology) throws IllegalAccessException {
+
+        LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name());
+        try {
+            if( ! typesRegistered ) {
+                registerDataModel(new HiveDataModelGenerator());
+            }
+
+            ArrayList<Referenceable> entities = new ArrayList<>();
+            Referenceable topologyReferenceable = createTopologyInstance(topologyInfo, stormConf);
+            List<Referenceable> dependentEntities = addTopologyDataSets(stormTopology, topologyReferenceable,
+                    topologyInfo.get_owner(), stormConf);
+            if (dependentEntities.size()>0) {
+                entities.addAll(dependentEntities);
+            }
+            // create the graph for the topology
+            ArrayList<Referenceable> graphNodes = createTopologyGraph(
+                    stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts());
+            // add the connection from topology to the graph
+            topologyReferenceable.set("nodes", graphNodes);
+            entities.add(topologyReferenceable);
+
+            LOG.debug("notifying entities, size = {}", entities.size());
+            notifyEntities(entities);
+        } catch (Exception e) {
+            throw new RuntimeException("Atlas hook is unable to process the topology.", e);
+        }
+    }
+
+    private Referenceable createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) throws Exception {
+        Referenceable topologyReferenceable = new Referenceable(
+                StormDataTypes.STORM_TOPOLOGY.getName());
+        topologyReferenceable.set("id", topologyInfo.get_id());
+        topologyReferenceable.set("name", topologyInfo.get_name());
+        String owner = topologyInfo.get_owner();
+        if (StringUtils.isEmpty(owner)) {
+            owner = ANONYMOUS_OWNER;
+        }
+        topologyReferenceable.set("owner", owner);
+        topologyReferenceable.set("startTime", System.currentTimeMillis());
+        topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
+
+        return topologyReferenceable;
+    }
+
+    private List<Referenceable> addTopologyDataSets(StormTopology stormTopology,
+                                                    Referenceable topologyReferenceable,
+                                                    String topologyOwner,
+                                                    Map stormConf) throws Exception {
+        List<Referenceable> dependentEntities = new ArrayList<>();
+        // add each spout as an input data set
+        addTopologyInputs(topologyReferenceable,
+                stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities);
+        // add the appropriate bolts as output data sets
+        addTopologyOutputs(topologyReferenceable, stormTopology, topologyOwner, stormConf, dependentEntities);
+        return dependentEntities;
+    }
+
+    private void addTopologyInputs(Referenceable topologyReferenceable,
+                                   Map<String, SpoutSpec> spouts,
+                                   Map stormConf,
+                                   String topologyOwner, List<Referenceable> dependentEntities) throws IllegalAccessException {
+        final ArrayList<Referenceable> inputDataSets = new ArrayList<>();
+        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
+            Serializable instance = Utils.javaDeserialize(
+                    entry.getValue().get_spout_object().get_serialized_java(), Serializable.class);
+
+            String simpleName = instance.getClass().getSimpleName();
+            final Referenceable datasetRef = createDataSet(simpleName, topologyOwner, instance, stormConf, dependentEntities);
+            if (datasetRef != null) {
+                inputDataSets.add(datasetRef);
+            }
+        }
+
+        topologyReferenceable.set("inputs", inputDataSets);
+    }
+
+    private void addTopologyOutputs(Referenceable topologyReferenceable,
+                                    StormTopology stormTopology, String topologyOwner,
+                                    Map stormConf, List<Referenceable> dependentEntities) throws Exception {
+        final ArrayList<Referenceable> outputDataSets = new ArrayList<>();
+
+        Map<String, Bolt> bolts = stormTopology.get_bolts();
+        Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
+        for (String terminalBoltName : terminalBoltNames) {
+            Serializable instance = Utils.javaDeserialize(bolts.get(terminalBoltName)
+                    .get_bolt_object().get_serialized_java(), Serializable.class);
+
+            String dataSetType = instance.getClass().getSimpleName();
+            final Referenceable datasetRef = createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities);
+            if (datasetRef != null) {
+                outputDataSets.add(datasetRef);
+            }
+        }
+
+        topologyReferenceable.set("outputs", outputDataSets);
+    }
+
+    private Referenceable createDataSet(String name, String topologyOwner,
+                                              Serializable instance,
+                                              Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException {
+        Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true);
+
+        String clusterName = null;
+        Referenceable dataSetReferenceable;
+        // todo: need to redo this with a config driven approach
+        switch (name) {
+            case "KafkaSpout":
+                dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
+                final String topicName = config.get("KafkaSpout._spoutConfig.topic");
+                dataSetReferenceable.set("topic", topicName);
+                dataSetReferenceable.set("uri",
+                        config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"));
+                if (StringUtils.isEmpty(topologyOwner)) {
+                    topologyOwner = ANONYMOUS_OWNER;
+                }
+                dataSetReferenceable.set("owner", topologyOwner);
+                dataSetReferenceable.set("name", getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
+                break;
+
+            case "HBaseBolt":
+                dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName());
+                final String hbaseTableName = config.get("HBaseBolt.tableName");
+                dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir"));
+                dataSetReferenceable.set("tableName", hbaseTableName);
+                dataSetReferenceable.set("owner", stormConf.get("storm.kerberos.principal"));
+                clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
+                //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
+                dataSetReferenceable.set("name", getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
+                        hbaseTableName));
+                break;
+
+            case "HdfsBolt":
+                dataSetReferenceable = new Referenceable(StormDataTypes.HDFS_DATA_SET.getName());
+                String hdfsUri = config.get("HdfsBolt.rotationActions") == null
+                        ? config.get("HdfsBolt.fileNameFormat.path")
+                        : config.get("HdfsBolt.rotationActions");
+                final String hdfsPath = config.get("HdfsBolt.fsUrl") + hdfsUri;
+                dataSetReferenceable.set("pathURI", hdfsPath);
+                dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
+                dataSetReferenceable.set("name", hdfsPath);
+                break;
+
+            case "HiveBolt":
+                // todo: verify if hive table has everything needed to retrieve existing table
+                Referenceable dbReferenceable = new Referenceable("hive_db");
+                String databaseName = config.get("HiveBolt.options.databaseName");
+                dbReferenceable.set(HiveDataModelGenerator.NAME, databaseName);
+                dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName));
+                dbReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf));
+                dependentEntities.add(dbReferenceable);
+                clusterName = extractComponentClusterName(new HiveConf(), stormConf);
+                final String hiveTableName = config.get("HiveBolt.options.tableName");
+                dataSetReferenceable = new Referenceable("hive_table");
+                final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName,
+                        databaseName, hiveTableName);
+                dataSetReferenceable.set(HiveDataModelGenerator.NAME, tableQualifiedName);
+                dataSetReferenceable.set(HiveDataModelGenerator.DB, dbReferenceable);
+                dataSetReferenceable.set(HiveDataModelGenerator.TABLE_NAME, hiveTableName);
+                break;
+
+            default:
+                // custom node - create a base dataset class with name attribute
+                //TODO - What should we do for custom data sets. Not sure what name we can set here?
+                return null;
+        }
+        dependentEntities.add(dataSetReferenceable);
+
+
+        return dataSetReferenceable;
+    }
+
+    private String extractComponentClusterName(Configuration configuration, Map stormConf) {
+        String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
+        if (clusterName == null) {
+            clusterName = getClusterName(stormConf);
+        }
+        return clusterName;
+    }
+
+
+    private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology,
+                                                         Map<String, SpoutSpec> spouts,
+                                                         Map<String, Bolt> bolts) throws Exception {
+        // Add graph of nodes in the topology
+        final Map<String, Referenceable> nodeEntities = new HashMap<>();
+        addSpouts(spouts, nodeEntities);
+        addBolts(bolts, nodeEntities);
+
+        addGraphConnections(stormTopology, nodeEntities);
+
+        ArrayList<Referenceable> nodes = new ArrayList<>();
+        nodes.addAll(nodeEntities.values());
+        return nodes;
+    }
+
+    private void addSpouts(Map<String, SpoutSpec> spouts,
+                           Map<String, Referenceable> nodeEntities) throws IllegalAccessException {
+        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
+            final String spoutName = entry.getKey();
+            Referenceable spoutReferenceable = createSpoutInstance(
+                    spoutName, entry.getValue());
+            nodeEntities.put(spoutName, spoutReferenceable);
+        }
+    }
+
+    private Referenceable createSpoutInstance(String spoutName,
+                                              SpoutSpec stormSpout) throws IllegalAccessException {
+        Referenceable spoutReferenceable = new Referenceable(
+                StormDataTypes.STORM_SPOUT.getName(), "DataProducer");
+        spoutReferenceable.set("name", spoutName);
+
+        Serializable instance = Utils.javaDeserialize(
+                stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
+        spoutReferenceable.set("driverClass", instance.getClass().getName());
+
+        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true);
+        spoutReferenceable.set("conf", flatConfigMap);
+
+        return spoutReferenceable;
+    }
+
+    private void addBolts(Map<String, Bolt> bolts,
+                          Map<String, Referenceable> nodeEntities) throws IllegalAccessException {
+        for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
+            Referenceable boltInstance = createBoltInstance(entry.getKey(), entry.getValue());
+            nodeEntities.put(entry.getKey(), boltInstance);
+        }
+    }
+
+    private Referenceable createBoltInstance(String boltName,
+                                             Bolt stormBolt) throws IllegalAccessException {
+        Referenceable boltReferenceable = new Referenceable(
+                StormDataTypes.STORM_BOLT.getName(), "DataProcessor");
+
+        boltReferenceable.set("name", boltName);
+
+        Serializable instance = Utils.javaDeserialize(
+                stormBolt.get_bolt_object().get_serialized_java(), Serializable.class);
+        boltReferenceable.set("driverClass", instance.getClass().getName());
+
+        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true);
+        boltReferenceable.set("conf", flatConfigMap);
+
+        return boltReferenceable;
+    }
+
+    private void addGraphConnections(StormTopology stormTopology,
+                                     Map<String, Referenceable> nodeEntities) throws Exception {
+        // adds connections between spouts and bolts
+        Map<String, Set<String>> adjacencyMap =
+                StormTopologyUtil.getAdjacencyMap(stormTopology, true);
+
+        for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) {
+            String nodeName = entry.getKey();
+            Set<String> adjacencyList = adjacencyMap.get(nodeName);
+            if (adjacencyList == null || adjacencyList.isEmpty()) {
+                continue;
+            }
+
+            // add outgoing links
+            Referenceable node = nodeEntities.get(nodeName);
+            ArrayList<String> outputs = new ArrayList<>(adjacencyList.size());
+            outputs.addAll(adjacencyList);
+            node.set("outputs", outputs);
+
+            // add incoming links
+            for (String adjacentNodeName : adjacencyList) {
+                Referenceable adjacentNode = nodeEntities.get(adjacentNodeName);
+                @SuppressWarnings("unchecked")
+                ArrayList<String> inputs = (ArrayList<String>) adjacentNode.get("inputs");
+                if (inputs == null) {
+                    inputs = new ArrayList<>();
+                }
+                inputs.add(nodeName);
+                adjacentNode.set("inputs", inputs);
+            }
+        }
+    }
+
+    public static String getKafkaTopicQualifiedName(String clusterName, String topicName) {
+        return String.format("%s@%s", topicName, clusterName);
+    }
+
+    public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) {
+        return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
+    }
+
+    public synchronized void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
+            AtlasServiceException {
+
+        try {
+            atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
+            LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
+        } catch(AtlasServiceException ase) {
+            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                //Expected in case types do not exist
+                LOG.info("Registering Hive data model");
+                atlasClient.createType(dataModelGenerator.getModelAsJson());
+            } else {
+                throw ase;
+            }
+        }
+
+
+        try {
+            atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
+        } catch(AtlasServiceException ase) {
+            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                LOG.info("Registering Storm/Kafka data model");
+                StormDataModel.main(new String[]{});
+                TypesDef typesDef = StormDataModel.typesDef();
+                String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
+                LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
+                atlasClient.createType(stormTypesAsJSON);
+            }
+        }
+        typesRegistered = true;
+    }
+
+    private String getClusterName(Map stormConf) {
+        String clusterName = AtlasConstants.DEFAULT_CLUSTER_NAME;
+        if (stormConf.containsKey(AtlasConstants.CLUSTER_NAME_KEY)) {
+            clusterName = (String)stormConf.get(AtlasConstants.CLUSTER_NAME_KEY);
+        }
+        return clusterName;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
new file mode 100644
index 0000000..b352a49
--- /dev/null
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.storm.hook;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.generated.StormTopology;
+import com.google.common.base.Joiner;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A storm topology utility class.
+ */
+public final class StormTopologyUtil {
+
+    private StormTopologyUtil() {
+    }
+
+    public static Set<String> getTerminalUserBoltNames(StormTopology topology) throws Exception {
+        Set<String> terminalBolts = new HashSet<>();
+        Set<String> inputs = new HashSet<>();
+        for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            String name = entry.getKey();
+            Set<GlobalStreamId> inputsForBolt = entry.getValue().get_common().get_inputs().keySet();
+            if (!isSystemComponent(name)) {
+                for (GlobalStreamId streamId : inputsForBolt) {
+                    inputs.add(streamId.get_componentId());
+                }
+            }
+        }
+
+        for (String boltName : topology.get_bolts().keySet()) {
+            if (!isSystemComponent(boltName) && !inputs.contains(boltName)) {
+                terminalBolts.add(boltName);
+            }
+        }
+
+        return terminalBolts;
+    }
+
+    public static boolean isSystemComponent(String componentName) {
+        return componentName.startsWith("__");
+    }
+
+    public static Map<String, Set<String>> getAdjacencyMap(StormTopology topology,
+                                                           boolean removeSystemComponent)
+    throws Exception {
+        Map<String, Set<String>> adjacencyMap = new HashMap<>();
+
+        for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            String boltName = entry.getKey();
+            Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs();
+            for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
+                String inputComponentId = input.getKey().get_componentId();
+                Set<String> components = adjacencyMap.containsKey(inputComponentId)
+                        ? adjacencyMap.get(inputComponentId) : new HashSet<String>();
+                components.add(boltName);
+                components = removeSystemComponent ? removeSystemComponents(components)
+                        : components;
+                if ((removeSystemComponent && !isSystemComponent(inputComponentId)) ||
+                        !removeSystemComponent) {
+                    adjacencyMap.put(inputComponentId, components);
+                }
+            }
+        }
+
+        return adjacencyMap;
+    }
+
+    public static Set<String> removeSystemComponents(Set<String> components) {
+        Set<String> userComponents = new HashSet<>();
+        for (String component : components) {
+            if (!isSystemComponent(component))
+                userComponents.add(component);
+        }
+
+        return userComponents;
+    }
+
+    private static final Set<Class> WRAPPER_TYPES = new HashSet<Class>() {{
+        add(Boolean.class);
+        add(Character.class);
+        add(Byte.class);
+        add(Short.class);
+        add(Integer.class);
+        add(Long.class);
+        add(Float.class);
+        add(Double.class);
+        add(Void.class);
+        add(String.class);
+    }};
+
+    public static boolean isWrapperType(Class clazz) {
+        return WRAPPER_TYPES.contains(clazz);
+    }
+
+    public static boolean isCollectionType(Class clazz) {
+        return Collection.class.isAssignableFrom(clazz);
+    }
+
+    public static boolean isMapType(Class clazz) {
+        return Map.class.isAssignableFrom(clazz);
+    }
+
+    public static Map<String, String> getFieldValues(Object instance,
+                                                     boolean prependClassName)
+    throws IllegalAccessException {
+        Class clazz = instance.getClass();
+        Map<String, String> output = new HashMap<>();
+        for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+            for (Field field : fields) {
+                if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+                    continue;
+                }
+
+                String key;
+                if (prependClassName) {
+                    key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
+                } else {
+                    key = field.getName();
+                }
+
+                boolean accessible = field.isAccessible();
+                if (!accessible) {
+                    field.setAccessible(true);
+                }
+                Object fieldVal = field.get(instance);
+                if (fieldVal == null) {
+                    continue;
+                } else if (fieldVal.getClass().isPrimitive() ||
+                        isWrapperType(fieldVal.getClass())) {
+                    if (toString(fieldVal, false).isEmpty()) continue;
+                    output.put(key, toString(fieldVal, false));
+                } else if (isMapType(fieldVal.getClass())) {
+                    //TODO: check if it makes more sense to just stick to json
+                    // like structure instead of a flatten output.
+                    Map map = (Map) fieldVal;
+                    for (Object entry : map.entrySet()) {
+                        Object mapKey = ((Map.Entry) entry).getKey();
+                        Object mapVal = ((Map.Entry) entry).getValue();
+
+                        String keyStr = getString(mapKey, false);
+                        String valStr = getString(mapVal, false);
+                        if ((valStr == null) || (valStr.isEmpty())) {
+                            continue;
+                        } else {
+                            output.put(String.format("%s.%s", key, keyStr), valStr);
+                        }
+                    }
+                } else if (isCollectionType(fieldVal.getClass())) {
+                    //TODO check if it makes more sense to just stick to
+                    // json like structure instead of a flatten output.
+                    Collection collection = (Collection) fieldVal;
+                    if (collection.size()==0) continue;
+                    String outStr = "";
+                    for (Object o : collection) {
+                        outStr += getString(o, false) + ",";
+                    }
+                    if (outStr.length() > 0) {
+                        outStr = outStr.substring(0, outStr.length() - 1);
+                    }
+                    output.put(key, String.format("%s", outStr));
+                } else {
+                    Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false);
+                    for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
+                        output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
+                    }
+                }
+                if (!accessible) {
+                    field.setAccessible(false);
+                }
+            }
+        }
+        return output;
+    }
+
+    private static String getString(Object instance,
+                                    boolean wrapWithQuote) throws IllegalAccessException {
+        if (instance == null) {
+            return null;
+        } else if (instance.getClass().isPrimitive() || isWrapperType(instance.getClass())) {
+            return toString(instance, wrapWithQuote);
+        } else {
+            return getString(getFieldValues(instance, false), wrapWithQuote);
+        }
+    }
+
+    private static String getString(Map<String, String> flattenFields, boolean wrapWithQuote) {
+        String outStr = "";
+        if (flattenFields != null && !flattenFields.isEmpty()) {
+            if (wrapWithQuote) {
+                outStr += "\"" + Joiner.on(",").join(flattenFields.entrySet()) + "\",";
+            } else {
+                outStr += Joiner.on(",").join(flattenFields.entrySet()) + ",";
+            }
+        }
+        if (outStr.length() > 0) {
+            outStr = outStr.substring(0, outStr.length() - 1);
+        }
+        return outStr;
+    }
+
+    private static String toString(Object instance, boolean wrapWithQuote) {
+        if (instance instanceof String)
+            if (wrapWithQuote)
+                return "\"" + instance + "\"";
+            else
+                return instance.toString();
+        else
+            return instance.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
new file mode 100644
index 0000000..2463a77
--- /dev/null
+++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.storm.hook;
+
+import backtype.storm.ILocalCluster;
+import backtype.storm.generated.StormTopology;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.storm.model.StormDataModel;
+import org.apache.atlas.storm.model.StormDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.TypesSerialization;
+import org.apache.commons.configuration.Configuration;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test
+public class StormAtlasHookIT {
+
+    public static final Logger LOG = LoggerFactory.getLogger(StormAtlasHookIT.class);
+
+    private static final String ATLAS_URL = "http://localhost:21000/";
+    private static final String TOPOLOGY_NAME = "word-count";
+
+    private ILocalCluster stormCluster;
+    private AtlasClient atlasClient;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        // start a local storm cluster
+        stormCluster = StormTestUtil.createLocalStormCluster();
+        LOG.info("Created a storm local cluster");
+
+        Configuration configuration = ApplicationProperties.get();
+        atlasClient = new AtlasClient(configuration.getString("atlas.rest.address", ATLAS_URL));
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        LOG.info("Shutting down storm local cluster");
+        stormCluster.shutdown();
+
+        atlasClient = null;
+    }
+
+    @Test
+    public void testCreateDataModel() throws Exception {
+        StormDataModel.main(new String[]{});
+        TypesDef stormTypesDef = StormDataModel.typesDef();
+
+        String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef);
+        LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
+
+        new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
+
+        // verify types are registered
+        for (StormDataTypes stormDataType : StormDataTypes.values()) {
+            Assert.assertNotNull(atlasClient.getType(stormDataType.getName()));
+        }
+    }
+
+    @Test (dependsOnMethods = "testCreateDataModel")
+    public void testAddEntities() throws Exception {
+        StormTopology stormTopology = StormTestUtil.createTestTopology();
+        StormTestUtil.submitTopology(stormCluster, TOPOLOGY_NAME, stormTopology);
+        LOG.info("Submitted topology {}", TOPOLOGY_NAME);
+
+        // todo: test if topology metadata is registered in atlas
+        String guid = getTopologyGUID();
+        Assert.assertNotNull(guid);
+        LOG.info("GUID is {}", guid);
+
+        Referenceable topologyReferenceable = atlasClient.getEntity(guid);
+        Assert.assertNotNull(topologyReferenceable);
+    }
+
+    private String getTopologyGUID() throws Exception {
+        LOG.debug("Searching for topology {}", TOPOLOGY_NAME);
+        String query = String.format("from %s where name = \"%s\"",
+                StormDataTypes.STORM_TOPOLOGY.getName(), TOPOLOGY_NAME);
+
+        JSONArray results = atlasClient.search(query);
+        JSONObject row = results.getJSONObject(0);
+
+        return row.has("$id$") ? row.getJSONObject("$id$").getString("id"): null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
new file mode 100644
index 0000000..51840a5
--- /dev/null
+++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.storm.hook;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.storm.model.StormDataTypes;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.contains;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@Test
+public class StormAtlasHookTest {
+
+    @Test
+    public void testStormRegistersHiveDataModelIfNotPresent() throws AtlasException, AtlasServiceException {
+        AtlasClient atlasClient = mock(AtlasClient.class);
+        HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
+        AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
+        when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
+        when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenThrow(atlasServiceException);
+        String hiveModel = "{hive_model_as_json}";
+        when(dataModelGenerator.getModelAsJson()).thenReturn(hiveModel);
+
+        StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
+        stormAtlasHook.registerDataModel(dataModelGenerator);
+
+        verify(atlasClient).createType(hiveModel);
+    }
+
+    @Test
+    public void testStormRegistersStormModelIfNotPresent() throws AtlasServiceException, AtlasException {
+        AtlasClient atlasClient = mock(AtlasClient.class);
+        HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
+        when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenReturn("hive_process_definition");
+        AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
+        when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
+        when(atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName())).thenThrow(atlasServiceException);
+
+        StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
+        stormAtlasHook.registerDataModel(dataModelGenerator);
+
+        verify(atlasClient).createType(contains("storm_topology"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java
new file mode 100644
index 0000000..1e13f56
--- /dev/null
+++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.storm.hook;
+
+import backtype.storm.Config;
+import backtype.storm.ILocalCluster;
+import backtype.storm.Testing;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.testing.TestGlobalCount;
+import backtype.storm.testing.TestWordCounter;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+
+import java.util.HashMap;
+
+/**
+ * An until to create a test topology.
+ */
+final class StormTestUtil {
+
+    private StormTestUtil() {
+    }
+
+    public static ILocalCluster createLocalStormCluster() {
+        // start a local storm cluster
+        HashMap<String,Object> localClusterConf = new HashMap<>();
+        localClusterConf.put("nimbus-daemon", true);
+        return Testing.getLocalCluster(localClusterConf);
+    }
+
+    public static StormTopology createTestTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("words", new TestWordSpout(), 10);
+        builder.setBolt("count", new TestWordCounter(), 3).shuffleGrouping("words");
+        builder.setBolt("globalCount", new TestGlobalCount(), 2).shuffleGrouping("count");
+
+        return builder.createTopology();
+    }
+
+    public static Config submitTopology(ILocalCluster stormCluster, String topologyName,
+                                        StormTopology stormTopology) throws Exception {
+        Config stormConf = new Config();
+        stormConf.putAll(Utils.readDefaultConfig());
+        stormConf.setDebug(true);
+        stormConf.setMaxTaskParallelism(3);
+        stormConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN,
+                org.apache.atlas.storm.hook.StormAtlasHook.class.getName());
+
+        stormCluster.submitTopology(topologyName, stormConf, stormTopology);
+
+        Thread.sleep(10000);
+        return stormConf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/common/src/main/java/org/apache/atlas/AtlasConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java
new file mode 100644
index 0000000..d590d6d
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+public interface AtlasConstants {
+    String CLUSTER_NAME_KEY = "atlas.cluster.name";
+    String DEFAULT_CLUSTER_NAME = "primary";
+    String CLUSTER_NAME_ATTRIBUTE = "clusterName";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/distro/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml
index b80a0ad..88d0e60 100755
--- a/distro/src/main/assemblies/standalone-package.xml
+++ b/distro/src/main/assemblies/standalone-package.xml
@@ -110,6 +110,12 @@
             <directory>../addons/sqoop-bridge/target/dependency/hook</directory>
             <outputDirectory>hook</outputDirectory>
         </fileSet>
+
+        <!-- addons/storm -->
+        <fileSet>
+            <directory>../addons/storm-bridge/target/dependency/hook</directory>
+            <outputDirectory>hook</outputDirectory>
+        </fileSet>
     </fileSets>
 
     <files>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
new file mode 100644
index 0000000..4b1c78c
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hook;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.commons.configuration.Configuration;
+import org.codehaus.jettison.json.JSONArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * A base class for atlas hooks.
+ */
+public abstract class AtlasHook {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
+    private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
+
+    public static final String ATLAS_ENDPOINT = "atlas.rest.address";
+
+    protected final AtlasClient atlasClient;
+
+    /**
+     * Hadoop Cluster name for this instance, typically used for namespace.
+     */
+    protected static Configuration atlasProperties;
+
+    @Inject
+    protected static NotificationInterface notifInterface;
+
+    static {
+        try {
+            atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES);
+        } catch (Exception e) {
+            LOG.info("Attempting to send msg while shutdown in progress.", e);
+        }
+
+        Injector injector = Guice.createInjector(new NotificationModule());
+        notifInterface = injector.getInstance(NotificationInterface.class);
+
+        LOG.info("Created Atlas Hook");
+    }
+
+    public AtlasHook() {
+        this(new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT, DEFAULT_ATLAS_URL)));
+    }
+
+    public AtlasHook(AtlasClient atlasClient) {
+        this.atlasClient = atlasClient;
+        //TODO - take care of passing in - ugi, doAsUser for secure cluster
+    }
+
+    protected abstract String getNumberOfRetriesPropertyKey();
+
+    protected void notifyEntities(Collection<Referenceable> entities) {
+        JSONArray entitiesArray = new JSONArray();
+
+        for (Referenceable entity : entities) {
+            LOG.info("Adding entity for type: {}", entity.getTypeName());
+            final String entityJson = InstanceSerialization.toJson(entity, true);
+            entitiesArray.put(entityJson);
+        }
+
+        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
+        hookNotificationMessages.add(new HookNotification.EntityCreateRequest(entitiesArray));
+        notifyEntities(hookNotificationMessages);
+    }
+
+    /**
+     * Notify atlas
+     * of the entity through message. The entity can be a
+     * complex entity with reference to other entities.
+     * De-duping of entities is done on server side depending on the
+     * unique attribute on the entities.
+     *
+     * @param entities entities
+     */
+    protected void notifyEntities(List<HookNotification.HookNotificationMessage> entities) {
+        final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
+        final String message = entities.toString();
+
+        int numRetries = 0;
+        while (true) {
+            try {
+                notifInterface.send(NotificationInterface.NotificationType.HOOK, entities);
+                return;
+            } catch(Exception e) {
+                numRetries++;
+                if(numRetries < maxRetries) {
+                    LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
+                } else {
+                    LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting",
+                            message, maxRetries, e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java
index c97c726..1d73af5 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java
@@ -32,8 +32,7 @@ public class KafkaNotificationProvider implements Provider<KafkaNotification> {
     public KafkaNotification get() {
         try {
             Configuration applicationProperties = ApplicationProperties.get();
-            KafkaNotification instance = new KafkaNotification(applicationProperties);
-            return instance;
+            return new KafkaNotification(applicationProperties);
         } catch(AtlasException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 9dfd4ff..fe02058 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags)
 ATLAS-370 Implement deleteEntities at repository level (dkantor via shwethags)
 ATLAS-406 Resizing lineage window – should be an anchor on a corner – like ppt for graphic (sanjayp via shwethags)
 ATLAS-432 QuickStart lineage is broken (yhemanth via shwethags)


Mime
View raw message