atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [3/3] incubator-atlas git commit: ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
Date Thu, 31 Mar 2016 10:51:24 GMT
ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bca454e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bca454e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bca454e1

Branch: refs/heads/master
Commit: bca454e16f0b289b39ab75986e6acdca49488d04
Parents: 985465f
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Thu Mar 31 14:49:12 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Thu Mar 31 14:49:12 2016 +0530

----------------------------------------------------------------------
 addons/falcon-bridge/pom.xml                    |   9 +-
 .../apache/atlas/falcon/hook/FalconHook.java    |  97 ++--------
 .../atlas/publisher/FalconEventPublisher.java   |   6 +-
 .../apache/atlas/falcon/hook/FalconHookIT.java  |  56 +++++-
 addons/hive-bridge/pom.xml                      |   9 +-
 .../org/apache/atlas/hive/hook/HiveHook.java    |  33 ++--
 addons/sqoop-bridge/pom.xml                     |   9 +-
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  |  73 +------
 .../apache/atlas/sqoop/hook/SqoopHookIT.java    |  30 ++-
 addons/storm-bridge/pom.xml                     |   9 +-
 .../apache/atlas/storm/hook/StormAtlasHook.java |  53 +-----
 .../atlas/storm/hook/StormAtlasHookIT.java      |  39 +++-
 .../atlas/storm/hook/StormAtlasHookTest.java    |  68 -------
 .../org/apache/atlas/ApplicationProperties.java |  12 +-
 distro/src/conf/atlas-log4j.xml                 |   2 +-
 .../java/org/apache/atlas/hook/AtlasHook.java   |  85 ++++++---
 .../notification/NotificationHookConsumer.java  |  23 +--
 .../atlas/notification/NotificationModule.java  |   7 +
 .../NotificationEntityChangeListener.java       |   2 +
 .../notification/hook/HookNotification.java     |  88 +++++----
 .../NotificationHookConsumerTest.java           |  37 +++-
 .../notification/hook/HookNotificationTest.java |  69 ++++---
 pom.xml                                         |   5 +-
 release-log.txt                                 |   1 +
 repository/pom.xml                              |   1 +
 .../apache/atlas/RepositoryMetadataModule.java  |  28 ++-
 .../repository/audit/EntityAuditListener.java   |  95 ++++++++++
 .../repository/audit/EntityAuditRepository.java |  37 +++-
 .../audit/HBaseBasedAuditRepository.java        |  25 ++-
 .../audit/InMemoryEntityAuditRepository.java    |  59 ++++++
 .../graph/GraphBackedSearchIndexer.java         | 112 ++++++-----
 .../atlas/services/DefaultMetadataService.java  |  61 +++---
 .../GraphBackedDiscoveryServiceTest.java        |   7 -
 .../audit/AuditRepositoryTestBase.java          |  81 ++++++++
 .../audit/HBaseBasedAuditRepositoryTest.java    |  88 +--------
 .../atlas/repository/audit/HBaseTestUtils.java  |  57 ++++++
 .../audit/InMemoryAuditRepositoryTest.java      |  28 +++
 .../service/DefaultMetadataServiceTest.java     |  57 +++++-
 .../DefaultMetadataServiceMockTest.java         |   7 +-
 server-api/pom.xml                              |   1 -
 .../java/org/apache/atlas/RequestContext.java   |  55 ++++++
 .../atlas/typesystem/types/TypeSystem.java      |   2 -
 .../typesystem/types/TypeSystemProvider.java    |  28 +++
 .../main/resources/atlas-application.properties |   8 +
 webapp/pom.xml                                  |   4 +-
 .../web/filters/AtlasAuthenticationFilter.java  |  79 +++++++-
 .../apache/atlas/web/filters/AuditFilter.java   |  10 +-
 .../atlas/web/listeners/GuiceServletConfig.java |  33 ++--
 .../atlas/web/service/EmbeddedServer.java       |   7 +-
 .../NotificationHookConsumerIT.java             |  12 +-
 .../AtlasAuthenticationKerberosFilterIT.java    | 190 +++++++++++++++++++
 .../AtlasAuthenticationSimpleFilterIT.java      |  98 ++++++++++
 .../MetadataAuthenticationKerberosFilterIT.java | 179 -----------------
 .../MetadataAuthenticationSimpleFilterIT.java   |  94 ---------
 .../web/listeners/TestGuiceServletConfig.java   |   6 +
 .../apache/atlas/web/listeners/TestModule.java  |  32 ++++
 .../web/security/BaseSSLAndKerberosTest.java    |   8 +-
 .../atlas/web/security/BaseSecurityTest.java    |  25 ++-
 .../security/NegativeSSLAndKerberosTest.java    |   3 +-
 .../org/apache/atlas/web/security/SSLTest.java  |  23 ++-
 .../web/service/SecureEmbeddedServerTest.java   |  26 ++-
 .../service/SecureEmbeddedServerTestBase.java   |  15 +-
 62 files changed, 1558 insertions(+), 945 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index ccdb512..ad345c5 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -205,13 +205,18 @@
                     <daemon>true</daemon>
                     <webApp>
                         <contextPath>/</contextPath>
-                        <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
                     </webApp>
                     <useTestScope>true</useTestScope>
                     <systemProperties>
                         <systemProperty>
                             <name>log4j.configuration</name>
-                            <value>atlas-log4j.xml</value>
+                            <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+                        </systemProperty>
+                        <systemProperty>
+                            <name>atlas.log.file</name>
+                            <value>application.log</value>
                         </systemProperty>
                         <systemProperty>
                             <name>atlas.log.dir</name>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index d4b0069..c1ab384 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -20,22 +20,17 @@ package org.apache.atlas.falcon.hook;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Guice;
-import com.google.inject.Inject;
 import com.google.inject.Injector;
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.falcon.model.FalconDataModelGenerator;
 import org.apache.atlas.falcon.model.FalconDataTypes;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.atlas.Util.EventUtil;
 import org.apache.falcon.atlas.event.FalconEvent;
@@ -50,8 +45,7 @@ import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.falcon.security.CurrentUser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +59,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Falcon hook sends lineage information to the Atlas Service.
  */
-public class FalconHook extends FalconEventPublisher {
+public class FalconHook extends AtlasHook implements FalconEventPublisher {
     private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class);
 
     public static final String CONF_PREFIX = "atlas.hook.falcon.";
@@ -77,10 +71,6 @@ public class FalconHook extends FalconEventPublisher {
 
     public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
 
-    public static final String ATLAS_ENDPOINT = "atlas.rest.address";
-
-    private static  AtlasClient atlasClient;
-
     // wait time determines how long we wait before we exit the jvm on
     // shutdown. Pending requests after that will not be sent.
     private static final int WAIT_TIME = 3;
@@ -91,20 +81,12 @@ public class FalconHook extends FalconEventPublisher {
     private static final long keepAliveTimeDefault = 10;
     private static final int queueSizeDefault = 10000;
 
-    private static Configuration atlasProperties;
-    @Inject
-    private static NotificationInterface notifInterface;
-
-    public static boolean typesRegistered = false;
-
     private static boolean sync;
 
     private static ConfigurationStore STORE;
 
     static {
         try {
-            atlasProperties = ApplicationProperties.get();
-
             // initialize the async facility to process hook calls. We don't
             // want to do this inline since it adds plenty of overhead for the query.
             int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
@@ -130,8 +112,6 @@ public class FalconHook extends FalconEventPublisher {
                     // shutdown client
                 }
             });
-            atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT),
-                    EventUtil.getUgi(), EventUtil.getUgi().getShortUserName());
 
             STORE = ConfigurationStore.get();
         } catch (Exception e) {
@@ -166,7 +146,17 @@ public class FalconHook extends FalconEventPublisher {
     private void fireAndForget(FalconEvent event) throws Exception {
         LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
 
-        notifyEntity(createEntities(event));
+        notifyEntities(getAuthenticatedUser(), createEntities(event));
+    }
+
+    private String getAuthenticatedUser() {
+        String user = null;
+        try {
+            user = CurrentUser.getAuthenticatedUser();
+        } catch (IllegalArgumentException e) {
+            LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser");
+        }
+        return getUser(user, null);
     }
 
     private List<Referenceable> createEntities(FalconEvent event) throws Exception {
@@ -179,36 +169,6 @@ public class FalconHook extends FalconEventPublisher {
     }
 
     /**
-     * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
-     * De-duping of entities is done on server side depending on the unique attribute on the
-     *
-     * @param entities entitiies to add
-     */
-    private void notifyEntity(List<Referenceable> entities) {
-        int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-        String message = entities.toString();
-
-        int numRetries = 0;
-        while (true) {
-            try {
-                notifInterface.send(NotificationInterface.NotificationType.HOOK,
-                        new HookNotification.EntityCreateRequest(entities));
-                return;
-            } catch (Exception e) {
-                numRetries++;
-                if (numRetries < maxRetries) {
-                    LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
-                } else {
-                    LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message,
-                            maxRetries, e);
-                    break;
-                }
-            }
-        }
-    }
-
-
-    /**
      +     * Creates process entity
      +     *
      +     * @param event process entity event
@@ -324,32 +284,9 @@ public class FalconHook extends FalconEventPublisher {
         return entities;
     }
 
-    public synchronized void registerFalconDataModel() throws Exception {
-        if (isDataModelAlreadyRegistered()) {
-            LOG.info("Falcon data model is already registered!");
-            return;
-        }
-
-        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties,
-                UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
-        hiveMetaStoreBridge.registerHiveDataModel();
-
-        FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
-        LOG.info("Registering Falcon data model");
-        atlasClient.createType(dataModelGenerator.getModelAsJson());
-    }
-
-    private boolean isDataModelAlreadyRegistered() throws Exception {
-        try {
-            atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
-            LOG.info("Hive data model is already registered!");
-            return true;
-        } catch(AtlasServiceException ase) {
-            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
-                return false;
-            }
-            throw ase;
-        }
+    @Override
+    protected String getNumberOfRetriesPropertyKey() {
+        return HOOK_NUM_RETRIES;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
index 3522339..8029be9 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
@@ -24,8 +24,8 @@ import org.apache.falcon.atlas.event.FalconEvent;
 /**
  * Falcon publisher for Atlas
  */
-public abstract class FalconEventPublisher {
-    public static class Data {
+public interface FalconEventPublisher {
+    class Data {
         private FalconEvent event;
 
         public Data(FalconEvent event) {
@@ -37,5 +37,5 @@ public abstract class FalconEventPublisher {
         }
     }
 
-    public abstract void publish(final Data data) throws Exception;
+    void publish(final Data data) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
index aaffa4a..4249a8f 100644
--- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
+++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -18,12 +18,16 @@
 
 package org.apache.atlas.falcon.hook;
 
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.falcon.model.FalconDataModelGenerator;
 import org.apache.atlas.falcon.model.FalconDataTypes;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.falcon.atlas.service.AtlasService;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -33,6 +37,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -54,21 +60,51 @@ public class FalconHookIT {
     public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
     public static final String PROCESS_RESOURCE = "/process.xml";
 
-    private AtlasClient dgiCLient;
+    private AtlasClient atlasClient;
 
     private static final ConfigurationStore STORE = ConfigurationStore.get();
+    private Configuration atlasProperties;
 
     @BeforeClass
     public void setUp() throws Exception {
-        dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address"));
+        atlasProperties = ApplicationProperties.get();
+        atlasClient = new AtlasClient(atlasProperties.getString("atlas.rest.address"));
 
         AtlasService service = new AtlasService();
         service.init();
         STORE.registerListener(service);
-        new FalconHook().registerFalconDataModel();
+        registerFalconDataModel();
         CurrentUser.authenticate(System.getProperty("user.name"));
     }
 
+    private void registerFalconDataModel() throws Exception {
+        if (isDataModelAlreadyRegistered()) {
+            LOG.info("Falcon data model is already registered!");
+            return;
+        }
+
+        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties,
+                UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
+        hiveMetaStoreBridge.registerHiveDataModel();
+
+        FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
+        LOG.info("Registering Falcon data model");
+        atlasClient.createType(dataModelGenerator.getModelAsJson());
+    }
+
+    private boolean isDataModelAlreadyRegistered() throws Exception {
+        try {
+            atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
+            LOG.info("Hive data model is already registered!");
+            return true;
+        } catch(AtlasServiceException ase) {
+            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                return false;
+            }
+            throw ase;
+        }
+    }
+
     private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException {
         Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
         switch (entity.getEntityType()) {
@@ -115,17 +151,17 @@ public class FalconHookIT {
         STORE.publish(EntityType.PROCESS, process);
 
         String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
-        Referenceable processEntity = dgiCLient.getEntity(pid);
+        Referenceable processEntity = atlasClient.getEntity(pid);
         assertNotNull(processEntity);
         assertEquals(processEntity.get("processName"), process.getName());
 
         Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
-        Referenceable inEntity = dgiCLient.getEntity(inId._getId());
+        Referenceable inEntity = atlasClient.getEntity(inId._getId());
         assertEquals(inEntity.get("name"),
                 HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName));
 
         Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
-        Referenceable outEntity = dgiCLient.getEntity(outId._getId());
+        Referenceable outEntity = atlasClient.getEntity(outId._getId());
         assertEquals(outEntity.get("name"),
                 HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
     }
@@ -173,12 +209,12 @@ public class FalconHookIT {
         STORE.publish(EntityType.PROCESS, process);
 
         String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
-        Referenceable processEntity = dgiCLient.getEntity(pid);
+        Referenceable processEntity = atlasClient.getEntity(pid);
         assertEquals(processEntity.get("processName"), process.getName());
         assertNull(processEntity.get("inputs"));
 
         Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
-        Referenceable outEntity = dgiCLient.getEntity(outId._getId());
+        Referenceable outEntity = atlasClient.getEntity(outId._getId());
         assertEquals(outEntity.get("name"),
                 HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
     }
@@ -209,13 +245,13 @@ public class FalconHookIT {
         waitFor(2000000, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
-                JSONArray results = dgiCLient.search(query);
+                JSONArray results = atlasClient.search(query);
                 System.out.println(results);
                 return results.length() == 1;
             }
         });
 
-        JSONArray results = dgiCLient.search(query);
+        JSONArray results = atlasClient.search(query);
         JSONObject row = results.getJSONObject(0).getJSONObject("t");
 
         return row.getString("id");

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index adb4f3a..8bfbb13 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -283,13 +283,18 @@
                     <daemon>true</daemon>
                     <webApp>
                         <contextPath>/</contextPath>
-                        <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
                     </webApp>
                     <useTestScope>true</useTestScope>
                     <systemProperties>
                         <systemProperty>
                             <name>log4j.configuration</name>
-                            <value>atlas-log4j.xml</value>
+                            <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+                        </systemProperty>
+                        <systemProperty>
+                            <name>atlas.log.file</name>
+                            <value>application.log</value>
                         </systemProperty>
                         <systemProperty>
                             <name>atlas.log.dir</name>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 16ed452..f313f2e 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -20,14 +20,12 @@ package org.apache.atlas.hive.hook;
 
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -86,8 +84,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final long keepAliveTimeDefault = 10;
     private static final int queueSizeDefault = 10000;
 
-    private static Configuration atlasProperties;
-
     class HiveEvent {
         public Set<ReadEntity> inputs;
         public Set<WriteEntity> outputs;
@@ -108,8 +104,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
     static {
         try {
-            atlasProperties = ApplicationProperties.get();
-
             // initialize the async facility to process hook calls. We don't
             // want to do this inline since it adds plenty of overhead for the query.
             int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
@@ -166,7 +160,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         event.inputs = hookContext.getInputs();
         event.outputs = hookContext.getOutputs();
 
-        event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName();
+        event.user = getUser(hookContext.getUserName(), hookContext.getUgi());
         event.ugi = hookContext.getUgi();
         event.operation = OPERATION_MAP.get(hookContext.getOperationName());
         event.hookType = hookContext.getHookType();
@@ -258,7 +252,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         for (WriteEntity writeEntity : event.outputs) {
             if (writeEntity.getType() == Type.DATABASE) {
                 //Create/update table entity
-                createOrUpdateEntities(dgiBridge, writeEntity);
+                createOrUpdateEntities(dgiBridge, event.user, writeEntity);
             }
         }
     }
@@ -271,7 +265,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
            //Below check should  filter out partition related
            if (writeEntity.getType() == Entity.Type.TABLE) {
                //Create/update table entity
-               createOrUpdateEntities(dgiBridge, writeEntity);
+               createOrUpdateEntities(dgiBridge, event.user, writeEntity);
            }
         }
     }
@@ -292,7 +286,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                     .equals(oldTable.getTableName())) {
 
                     //Create/update old table entity - create new entity and replace id
-                    Referenceable tableEntity = createOrUpdateEntities(dgiBridge, writeEntity);
+                    Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
                     String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
                             oldTable.getDbName(), oldTable.getTableName());
                     tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
@@ -304,14 +298,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                     Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
                     newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
                     newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
-                    messages.add(new HookNotification.EntityPartialUpdateRequest(HiveDataTypes.HIVE_TABLE.getName(),
-                            HiveDataModelGenerator.NAME, oldQualifiedName, newEntity));
+                    messages.add(new HookNotification.EntityPartialUpdateRequest(event.user,
+                            HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
+                            oldQualifiedName, newEntity));
                 }
             }
         }
     }
 
-    private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
+    private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity) throws Exception {
         Database db = null;
         Table table = null;
         Partition partition = null;
@@ -351,14 +346,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             entities.add(partitionEntity);
         }
 
-        messages.add(new HookNotification.EntityUpdateRequest(entities));
+        messages.add(new HookNotification.EntityUpdateRequest(user, entities));
         return tableEntity;
     }
 
     private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
         for (WriteEntity entity : event.outputs) {
             if (entity.getType() == entityType) {
-                createOrUpdateEntities(dgiBridge, entity);
+                createOrUpdateEntities(dgiBridge, event.user, entity);
             }
         }
     }
@@ -396,7 +391,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         List<Referenceable> source = new ArrayList<>();
         for (ReadEntity readEntity : inputs) {
             if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
-                Referenceable inTable = createOrUpdateEntities(dgiBridge, readEntity);
+                Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity);
                 source.add(inTable);
             }
         }
@@ -405,7 +400,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         List<Referenceable> target = new ArrayList<>();
         for (WriteEntity writeEntity : outputs) {
             if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
-                Referenceable outTable = createOrUpdateEntities(dgiBridge, writeEntity);
+                Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
                 target.add(outTable);
             }
         }
@@ -417,7 +412,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
         //TODO set
         processReferenceable.set("queryGraph", "queryGraph");
-        messages.add(new HookNotification.EntityCreateRequest(processReferenceable));
+        messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable));
     }
 
 
@@ -432,6 +427,4 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             return new JSONObject();
         }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 0927c8f..343bb4e 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -288,13 +288,18 @@
                     <daemon>true</daemon>
                     <webApp>
                         <contextPath>/</contextPath>
-                        <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
                     </webApp>
                     <useTestScope>true</useTestScope>
                     <systemProperties>
                         <systemProperty>
                             <name>log4j.configuration</name>
-                            <value>atlas-log4j.xml</value>
+                            <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+                        </systemProperty>
+                        <systemProperty>
+                            <name>atlas.log.file</name>
+                            <value>application.log</value>
                         </systemProperty>
                         <systemProperty>
                             <name>atlas.log.dir</name>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index b573ac4..924e467 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -19,31 +19,24 @@
 package org.apache.atlas.sqoop.hook;
 
 
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
 import org.apache.atlas.sqoop.model.SqoopDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sqoop.SqoopJobDataPublisher;
 import org.apache.sqoop.util.ImportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -55,43 +48,16 @@ import java.util.Properties;
 public class SqoopHook extends SqoopJobDataPublisher {
 
     private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
-    private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
     public static final String CONF_PREFIX = "atlas.hook.sqoop.";
     public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
 
     public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
-    public static final String ATLAS_REST_ADDRESS = "atlas.rest.address";
-
-    @Inject
-    private static NotificationInterface notifInterface;
 
     static {
         org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
     }
 
-    synchronized void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception {
-        // Make sure hive model exists
-        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf,
-                UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
-        hiveMetaStoreBridge.registerHiveDataModel();
-        SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
-
-        //Register sqoop data model if its not already registered
-        try {
-            client.getType(SqoopDataTypes.SQOOP_PROCESS.getName());
-            LOG.info("Sqoop data model is already registered!");
-        } catch(AtlasServiceException ase) {
-            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
-                //Expected in case types do not exist
-                LOG.info("Registering Sqoop data model");
-                client.createType(dataModelGenerator.getModelAsJson());
-            } else {
-                throw ase;
-            }
-        }
-    }
-
     public Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
             throws Exception {
         Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
@@ -182,12 +148,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
 
     @Override
     public void publish(SqoopJobDataPublisher.Data data) throws Exception {
-        Injector injector = Guice.createInjector(new NotificationModule());
-        notifInterface = injector.getInstance(NotificationInterface.class);
-
         Configuration atlasProperties = ApplicationProperties.get();
-        AtlasClient atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_REST_ADDRESS, DEFAULT_DGI_URL),
-                UserGroupInformation.getCurrentUser(), UserGroupInformation.getCurrentUser().getShortUserName());
         org.apache.hadoop.conf.Configuration sqoopConf = new org.apache.hadoop.conf.Configuration();
         String clusterName = sqoopConf.get(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
 
@@ -197,33 +158,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
                 data.getHiveTable(), data.getHiveDB());
         Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
 
-        notifyEntity(atlasProperties, dbStoreRef, dbRef, hiveTableRef, procRef);
-    }
-
-    /**
-     * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
-     * De-duping of entities is done on server side depending on the unique attribute on the
-     * @param entities - Entity references to publish.
-     */
-    private void notifyEntity(Configuration atlasProperties, Referenceable... entities) {
         int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-
-        int numRetries = 0;
-        while (true) {
-            try {
-                notifInterface.send(NotificationInterface.NotificationType.HOOK,
-                        new HookNotification.EntityCreateRequest(entities));
-                return;
-            } catch(Exception e) {
-                numRetries++;
-                if(numRetries < maxRetries) {
-                    LOG.debug("Failed to notify atlas for entity {}. Retrying", entities, e);
-                } else {
-                    LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", entities,
-                            maxRetries, e);
-                    break;
-                }
-            }
-        }
+        HookNotification.HookNotificationMessage message =
+                new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
+        AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
index 94cd105..0e4658a 100644
--- a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
+++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
@@ -18,11 +18,17 @@
 
 package org.apache.atlas.sqoop.hook;
 
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
 import org.apache.atlas.sqoop.model.SqoopDataTypes;
 import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sqoop.SqoopJobDataPublisher;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
@@ -44,7 +50,29 @@ public class SqoopHookIT {
         //Set-up sqoop session
         Configuration configuration = ApplicationProperties.get();
         dgiCLient = new AtlasClient(configuration.getString("atlas.rest.address"));
-        new SqoopHook().registerDataModels(dgiCLient, configuration);
+        registerDataModels(dgiCLient, configuration);
+    }
+
+    private void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception {
+        // Make sure hive model exists
+        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf,
+                UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
+        hiveMetaStoreBridge.registerHiveDataModel();
+        SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
+
+        //Register sqoop data model if its not already registered
+        try {
+            client.getType(SqoopDataTypes.SQOOP_PROCESS.getName());
+            LOG.info("Sqoop data model is already registered!");
+        } catch(AtlasServiceException ase) {
+            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                //Expected in case types do not exist
+                LOG.info("Registering Sqoop data model");
+                client.createType(dataModelGenerator.getModelAsJson());
+            } else {
+                throw ase;
+            }
+        }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index 76c4507..e3b4ed7 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -318,13 +318,18 @@
                     <daemon>true</daemon>
                     <webApp>
                         <contextPath>/</contextPath>
-                        <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
                     </webApp>
                     <useTestScope>true</useTestScope>
                     <systemProperties>
                         <systemProperty>
                             <name>log4j.configuration</name>
-                            <value>atlas-log4j.xml</value>
+                            <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+                        </systemProperty>
+                        <systemProperty>
+                            <name>atlas.log.file</name>
+                            <value>application.log</value>
                         </systemProperty>
                         <systemProperty>
                             <name>atlas.log.dir</name>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 4c0004b..620f929 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -24,20 +24,13 @@ import backtype.storm.generated.SpoutSpec;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.TopologyInfo;
 import backtype.storm.utils.Utils;
-import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
-import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.storm.model.StormDataModel;
 import org.apache.atlas.storm.model.StormDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.TypesSerialization;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -70,15 +63,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
 
     public static final String HBASE_NAMESPACE_DEFAULT = "default";
 
-    private static volatile boolean typesRegistered = false;
-
-    public StormAtlasHook() {
-        super();
-    }
-
-    StormAtlasHook(AtlasClient atlasClient) {
-        super(atlasClient);
-    }
     @Override
     protected String getNumberOfRetriesPropertyKey() {
         return HOOK_NUM_RETRIES;
@@ -113,7 +97,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
             entities.add(topologyReferenceable);
 
             LOG.debug("notifying entities, size = {}", entities.size());
-            notifyEntities(entities);
+            String user = getUser(topologyInfo.get_owner(), null);
+            notifyEntities(user, entities);
         } catch (Exception e) {
             throw new RuntimeException("Atlas hook is unable to process the topology.", e);
         }
@@ -379,38 +364,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
         return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
     }
 
-    public synchronized void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
-            AtlasServiceException {
-
-        try {
-            atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
-            LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
-        } catch(AtlasServiceException ase) {
-            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
-                //Expected in case types do not exist
-                LOG.info("Registering Hive data model");
-                atlasClient.createType(dataModelGenerator.getModelAsJson());
-            } else {
-                throw ase;
-            }
-        }
-
-
-        try {
-            atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
-        } catch(AtlasServiceException ase) {
-            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
-                LOG.info("Registering Storm/Kafka data model");
-                StormDataModel.main(new String[]{});
-                TypesDef typesDef = StormDataModel.typesDef();
-                String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
-                LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
-                atlasClient.createType(stormTypesAsJSON);
-            }
-        }
-        typesRegistered = true;
-    }
-
     private String getClusterName(Map stormConf) {
         String clusterName = AtlasConstants.DEFAULT_CLUSTER_NAME;
         if (stormConf.containsKey(AtlasConstants.CLUSTER_NAME_KEY)) {
@@ -418,6 +371,4 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
         }
         return clusterName;
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
index 79f1b07..4648d24 100644
--- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
+++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
@@ -20,9 +20,13 @@ package org.apache.atlas.storm.hook;
 
 import backtype.storm.ILocalCluster;
 import backtype.storm.generated.StormTopology;
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.storm.model.StormDataModel;
 import org.apache.atlas.storm.model.StormDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
@@ -57,9 +61,40 @@ public class StormAtlasHookIT {
 
         Configuration configuration = ApplicationProperties.get();
         atlasClient = new AtlasClient(configuration.getString("atlas.rest.address", ATLAS_URL));
-        new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
+        registerDataModel(new HiveDataModelGenerator());
     }
 
+    private void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
+            AtlasServiceException {
+        try {
+            atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
+            LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
+        } catch(AtlasServiceException ase) {
+            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                //Expected in case types do not exist
+                LOG.info("Registering Hive data model");
+                atlasClient.createType(dataModelGenerator.getModelAsJson());
+            } else {
+                throw ase;
+            }
+        }
+
+
+        try {
+            atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
+        } catch(AtlasServiceException ase) {
+            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                LOG.info("Registering Storm/Kafka data model");
+                StormDataModel.main(new String[]{});
+                TypesDef typesDef = StormDataModel.typesDef();
+                String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
+                LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
+                atlasClient.createType(stormTypesAsJSON);
+            }
+        }
+    }
+
+
     @AfterClass
     public void tearDown() throws Exception {
         LOG.info("Shutting down storm local cluster");
@@ -76,7 +111,7 @@ public class StormAtlasHookIT {
         String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef);
         LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
 
-        new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
+        registerDataModel(new HiveDataModelGenerator());
 
         // verify types are registered
         for (StormDataTypes stormDataType : StormDataTypes.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
deleted file mode 100644
index 51840a5..0000000
--- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.storm.hook;
-
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.hive.model.HiveDataModelGenerator;
-import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.storm.model.StormDataTypes;
-import org.testng.annotations.Test;
-
-import static org.mockito.Matchers.contains;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@Test
-public class StormAtlasHookTest {
-
-    @Test
-    public void testStormRegistersHiveDataModelIfNotPresent() throws AtlasException, AtlasServiceException {
-        AtlasClient atlasClient = mock(AtlasClient.class);
-        HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
-        AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
-        when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
-        when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenThrow(atlasServiceException);
-        String hiveModel = "{hive_model_as_json}";
-        when(dataModelGenerator.getModelAsJson()).thenReturn(hiveModel);
-
-        StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
-        stormAtlasHook.registerDataModel(dataModelGenerator);
-
-        verify(atlasClient).createType(hiveModel);
-    }
-
-    @Test
-    public void testStormRegistersStormModelIfNotPresent() throws AtlasServiceException, AtlasException {
-        AtlasClient atlasClient = mock(AtlasClient.class);
-        HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
-        when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenReturn("hive_process_definition");
-        AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
-        when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
-        when(atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName())).thenThrow(atlasServiceException);
-
-        StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
-        stormAtlasHook.registerDataModel(dataModelGenerator);
-
-        verify(atlasClient).createType(contains("storm_topology"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/common/src/main/java/org/apache/atlas/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/ApplicationProperties.java b/common/src/main/java/org/apache/atlas/ApplicationProperties.java
index d74a30e..ca72ffd 100644
--- a/common/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/common/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -35,12 +35,22 @@ public final class ApplicationProperties extends PropertiesConfiguration {
 
     public static final String APPLICATION_PROPERTIES = "atlas-application.properties";
 
-    private static Configuration instance = null;
+    private static volatile Configuration instance = null;
 
     private ApplicationProperties(URL url) throws ConfigurationException {
         super(url);
     }
 
+    public static void forceReload() {
+        if (instance != null) {
+            synchronized (ApplicationProperties.class) {
+                if (instance != null) {
+                    instance = null;
+                }
+            }
+        }
+    }
+
     public static Configuration get() throws AtlasException {
         if (instance == null) {
             synchronized (ApplicationProperties.class) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index 6071703..1ac4082 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -55,7 +55,7 @@
         <appender-ref ref="FILE"/>
     </logger>
 
-    <logger name="AUDIT">
+    <logger name="AUDIT" additivity="false">
         <level value="info"/>
         <appender-ref ref="AUDIT"/>
     </logger>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 2e41c5c..7e09a19 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -19,20 +19,21 @@
 package org.apache.atlas.hook;
 
 import com.google.inject.Guice;
-import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -44,25 +45,19 @@ import java.util.List;
 public abstract class AtlasHook {
 
     private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
-    private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
-
-    public static final String ATLAS_ENDPOINT = "atlas.rest.address";
-
-    protected final AtlasClient atlasClient;
 
     /**
      * Hadoop Cluster name for this instance, typically used for namespace.
      */
     protected static Configuration atlasProperties;
 
-    @Inject
     protected static NotificationInterface notifInterface;
 
     static {
         try {
             atlasProperties = ApplicationProperties.get();
         } catch (Exception e) {
-            LOG.info("Attempting to send msg while shutdown in progress.", e);
+            LOG.info("Failed to load application properties", e);
         }
 
         Injector injector = Guice.createInjector(new NotificationModule());
@@ -71,18 +66,9 @@ public abstract class AtlasHook {
         LOG.info("Created Atlas Hook");
     }
 
-    public AtlasHook() {
-        this(new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT, DEFAULT_ATLAS_URL)));
-    }
-
-    public AtlasHook(AtlasClient atlasClient) {
-        this.atlasClient = atlasClient;
-        //TODO - take care of passing in - ugi, doAsUser for secure cluster
-    }
-
     protected abstract String getNumberOfRetriesPropertyKey();
 
-    protected void notifyEntities(Collection<Referenceable> entities) {
+    protected void notifyEntities(String user, Collection<Referenceable> entities) {
         JSONArray entitiesArray = new JSONArray();
 
         for (Referenceable entity : entities) {
@@ -92,27 +78,26 @@ public abstract class AtlasHook {
         }
 
         List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
-        hookNotificationMessages.add(new HookNotification.EntityCreateRequest(entitiesArray));
+        hookNotificationMessages.add(new HookNotification.EntityCreateRequest(user, entitiesArray));
         notifyEntities(hookNotificationMessages);
     }
 
     /**
-     * Notify atlas
-     * of the entity through message. The entity can be a
+     * Notify atlas of the entity through message. The entity can be a
      * complex entity with reference to other entities.
      * De-duping of entities is done on server side depending on the
      * unique attribute on the entities.
      *
-     * @param entities entities
+     * @param messages hook notification messages
+     * @param maxRetries maximum number of retries while sending message to messaging system
      */
-    protected void notifyEntities(List<HookNotification.HookNotificationMessage> entities) {
-        final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
-        final String message = entities.toString();
+    public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) {
+        final String message = messages.toString();
 
         int numRetries = 0;
         while (true) {
             try {
-                notifInterface.send(NotificationInterface.NotificationType.HOOK, entities);
+                notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
                 return;
             } catch(Exception e) {
                 numRetries++;
@@ -125,4 +110,50 @@ public abstract class AtlasHook {
             }
         }
     }
+
+    /**
+     * Notify atlas of the entity through message. The entity can be a
+     * complex entity with reference to other entities.
+     * De-duping of entities is done on server side depending on the
+     * unique attribute on the entities.
+     *
+     * @param messages hook notification messages
+     */
+    protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) {
+        final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
+        notifyEntities(messages, maxRetries);
+    }
+
+    /**
+     * Returns the logged in user.
+     * @return
+     */
+    public static String getUser() {
+        return getUser(null, null);
+    }
+
+    /**
+     * Returns the user. Order of preference:
+     * 1. Given userName
+     * 2. ugi.getShortUserName()
+     * 3. UserGroupInformation.getCurrentUser().getShortUserName()
+     * 4. System.getProperty("user.name")
+     */
+
+    public static String getUser(String userName, UserGroupInformation ugi) {
+        if (StringUtils.isNotEmpty(userName)) {
+            return userName;
+        }
+
+        if (ugi != null && StringUtils.isNotEmpty(ugi.getShortUserName())) {
+            return ugi.getShortUserName();
+        }
+
+        try {
+            return UserGroupInformation.getCurrentUser().getShortUserName();
+        } catch (IOException e) {
+            LOG.warn("Failed for UserGroupInformation.getCurrentUser()");
+            return System.getProperty("user.name");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 015af44..2fcbcd3 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -26,6 +26,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,14 +49,13 @@ public class NotificationHookConsumer implements Service {
     @Inject
     private NotificationInterface notificationInterface;
     private ExecutorService executors;
-    private AtlasClient atlasClient;
+    private String atlasEndpoint;
 
     @Override
     public void start() throws AtlasException {
         Configuration applicationProperties = ApplicationProperties.get();
 
-        String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
-        atlasClient = new AtlasClient(atlasEndpoint);
+        atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
         int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
         List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
@@ -87,15 +87,8 @@ public class NotificationHookConsumer implements Service {
 
     class HookConsumer implements Runnable {
         private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
-        private final AtlasClient client;
 
         public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
-            this(atlasClient, consumer);
-        }
-
-        public HookConsumer(AtlasClient client,
-                            NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
-            this.client = client;
             this.consumer = consumer;
         }
 
@@ -118,6 +111,9 @@ public class NotificationHookConsumer implements Service {
                 try {
                     if (hasNext()) {
                         HookNotification.HookNotificationMessage message = consumer.next();
+                        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser());
+                        AtlasClient atlasClient = getAtlasClient(ugi);
+
                         try {
                             switch (message.getType()) {
                             case ENTITY_CREATE:
@@ -154,9 +150,14 @@ public class NotificationHookConsumer implements Service {
             }
         }
 
+        protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+            return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
+        }
+
         boolean serverAvailable(Timer timer) {
             try {
-                while (!client.isServerReady()) {
+                AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser());
+                while (!atlasClient.isServerReady()) {
                     try {
                         LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
                                 SERVER_READY_WAIT_TIME_MS);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
index c20fdf1..e8ae177 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
@@ -22,6 +22,8 @@ import com.google.inject.Singleton;
 import com.google.inject.multibindings.Multibinder;
 import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.kafka.KafkaNotificationProvider;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
 import org.apache.atlas.service.Service;
 
 /**
@@ -37,5 +39,10 @@ public class NotificationModule extends AbstractModule {
         Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
         serviceBinder.addBinding().to(KafkaNotification.class);
         serviceBinder.addBinding().to(NotificationHookConsumer.class);
+
+        //Add NotificationEntityChangeListener as EntityChangeListener
+        Multibinder<EntityChangeListener> entityChangeListenerBinder =
+                Multibinder.newSetBinder(binder(), EntityChangeListener.class);
+        entityChangeListenerBinder.addBinding().to(NotificationEntityChangeListener.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
index 31f5c2b..300cbb5 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.notification.entity;
 
+import com.google.inject.Inject;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.notification.NotificationInterface;
@@ -48,6 +49,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
      * @param notificationInterface the notification framework interface
      * @param typeSystem the Atlas type system
      */
+    @Inject
     public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
         this.notificationInterface = notificationInterface;
         this.typeSystem = typeSystem;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
index a000161..4c7f6de 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonParseException;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.TypesDef;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.commons.lang.StringUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 
@@ -41,29 +42,24 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
     @Override
     public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
                                                JsonDeserializationContext context) {
-        if (json.isJsonArray()) {
-            JSONArray jsonArray = context.deserialize(json, JSONArray.class);
-            return new EntityCreateRequest(jsonArray);
-        } else {
-            HookNotificationType type =
-                    context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
-            switch (type) {
-            case ENTITY_CREATE:
-                return context.deserialize(json, EntityCreateRequest.class);
-
-            case ENTITY_FULL_UPDATE:
-                return context.deserialize(json, EntityUpdateRequest.class);
-
-            case ENTITY_PARTIAL_UPDATE:
-                return context.deserialize(json, EntityPartialUpdateRequest.class);
-
-            case TYPE_CREATE:
-            case TYPE_UPDATE:
-                return context.deserialize(json, TypeRequest.class);
-
-            default:
-                throw new IllegalStateException("Unhandled type " + type);
-            }
+        HookNotificationType type =
+                context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
+        switch (type) {
+        case ENTITY_CREATE:
+            return context.deserialize(json, EntityCreateRequest.class);
+
+        case ENTITY_FULL_UPDATE:
+            return context.deserialize(json, EntityUpdateRequest.class);
+
+        case ENTITY_PARTIAL_UPDATE:
+            return context.deserialize(json, EntityPartialUpdateRequest.class);
+
+        case TYPE_CREATE:
+        case TYPE_UPDATE:
+            return context.deserialize(json, TypeRequest.class);
+
+        default:
+            throw new IllegalStateException("Unhandled type " + type);
         }
     }
 
@@ -78,18 +74,30 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
      * Base type of hook message.
      */
     public static class HookNotificationMessage {
+        public static final String UNKNOW_USER = "UNKNOWN";
         protected HookNotificationType type;
+        protected String user;
 
         private HookNotificationMessage() {
         }
 
-        public HookNotificationMessage(HookNotificationType type) {
+        public HookNotificationMessage(HookNotificationType type, String user) {
             this.type = type;
+            this.user = user;
         }
 
         public HookNotificationType getType() {
             return type;
         }
+
+        public String getUser() {
+            if (StringUtils.isEmpty(user)) {
+                return UNKNOW_USER;
+            }
+            return user;
+        }
+
+
     }
 
     /**
@@ -101,8 +109,8 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
         private TypeRequest() {
         }
 
-        public TypeRequest(HookNotificationType type, TypesDef typesDef) {
-            super(type);
+        public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
+            super(type, user);
             this.typesDef = typesDef;
         }
 
@@ -120,21 +128,21 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
         private EntityCreateRequest() {
         }
 
-        public EntityCreateRequest(Referenceable... entities) {
-            this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities));
+        public EntityCreateRequest(String user, Referenceable... entities) {
+            this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
         }
 
-        public EntityCreateRequest(List<Referenceable> entities) {
-            this(HookNotificationType.ENTITY_CREATE, entities);
+        public EntityCreateRequest(String user, List<Referenceable> entities) {
+            this(HookNotificationType.ENTITY_CREATE, entities, user);
         }
 
-        protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) {
-            super(type);
+        protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
+            super(type, user);
             this.entities = entities;
         }
 
-        public EntityCreateRequest(JSONArray jsonArray) {
-            super(HookNotificationType.ENTITY_CREATE);
+        public EntityCreateRequest(String user, JSONArray jsonArray) {
+            super(HookNotificationType.ENTITY_CREATE, user);
             entities = new ArrayList<>();
             for (int index = 0; index < jsonArray.length(); index++) {
                 try {
@@ -154,12 +162,12 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
      * Hook message for updating entities(full update).
      */
     public static class EntityUpdateRequest extends EntityCreateRequest {
-        public EntityUpdateRequest(Referenceable... entities) {
-            this(Arrays.asList(entities));
+        public EntityUpdateRequest(String user, Referenceable... entities) {
+            this(user, Arrays.asList(entities));
         }
 
-        public EntityUpdateRequest(List<Referenceable> entities) {
-            super(HookNotificationType.ENTITY_FULL_UPDATE, entities);
+        public EntityUpdateRequest(String user, List<Referenceable> entities) {
+            super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
         }
     }
 
@@ -175,9 +183,9 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
         private EntityPartialUpdateRequest() {
         }
 
-        public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue,
+        public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue,
                                           Referenceable entity) {
-            super(HookNotificationType.ENTITY_PARTIAL_UPDATE);
+            super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
             this.typeName = typeName;
             this.attribute = attribute;
             this.attributeValue = attributeValue;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index b3d4721..02255a7 100644
--- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
@@ -29,10 +30,15 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
-        AtlasClient atlasClient = mock(AtlasClient.class);
+        final AtlasClient atlasClient = mock(AtlasClient.class);
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
         NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+                    @Override
+                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+                        return atlasClient;
+                    }
+                };
         NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
         when(atlasClient.isServerReady()).thenReturn(true);
 
@@ -43,10 +49,15 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
-        AtlasClient atlasClient = mock(AtlasClient.class);
+        final AtlasClient atlasClient = mock(AtlasClient.class);
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
         NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+                    @Override
+                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+                        return atlasClient;
+                    }
+                };
         NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
         when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
 
@@ -57,10 +68,15 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
-        AtlasClient atlasClient = mock(AtlasClient.class);
+        final AtlasClient atlasClient = mock(AtlasClient.class);
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
         NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+                    @Override
+                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+                        return atlasClient;
+                    }
+                };
         NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
         doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
         when(atlasClient.isServerReady()).thenReturn(false);
@@ -70,10 +86,15 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
-        AtlasClient atlasClient = mock(AtlasClient.class);
+        final AtlasClient atlasClient = mock(AtlasClient.class);
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
         NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+                    @Override
+                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+                        return atlasClient;
+                    }
+                };
         NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
         when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
                 new Exception()));

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
index 1dedb5b..11b7a53 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -19,49 +19,74 @@ package org.apache.atlas.notification.hook;
 
 import org.apache.atlas.notification.AbstractNotificationConsumer;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.codehaus.jettison.json.JSONArray;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 
 public class HookNotificationTest {
-
-    @Test
-    public void testMessageBackwardCompatibility() throws Exception {
-        JSONArray jsonArray = new JSONArray();
-        Referenceable entity = new Referenceable("sometype");
-        entity.set("name", "somename");
-        String entityJson = InstanceSerialization.toJson(entity, true);
-        jsonArray.put(entityJson);
-
-        HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson(
-                jsonArray.toString(), HookNotification.HookNotificationMessage.class);
-        assertNotNull(notification);
-        assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
-        HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification;
-        assertEquals(createRequest.getEntities().size(), 1);
-        assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName());
-    }
-
     @Test
     public void testNewMessageSerDe() throws Exception {
         Referenceable entity1 = new Referenceable("sometype");
         entity1.set("attr", "value");
         entity1.set("complex", new Referenceable("othertype"));
         Referenceable entity2 = new Referenceable("newtype");
-        HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2);
+        String user = "user";
+        HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(user, entity1, entity2);
 
         String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
         HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
                 notificationJson, HookNotification.HookNotificationMessage.class);
         assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+        assertEquals(actualNotification.getUser(), user);
+
         HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
         assertEquals(createRequest.getEntities().size(), 2);
+
         Referenceable actualEntity1 = createRequest.getEntities().get(0);
         assertEquals(actualEntity1.getTypeName(), "sometype");
         assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
         assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
     }
+
+    @Test
+    public void testBackwardCompatibility() throws Exception {
+        /**
+        Referenceable entity = new Referenceable("sometype");
+        entity.set("attr", "value");
+        String user = "user";
+        HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(null, entity);
+
+        String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
+        System.out.println(notificationJson);
+         **/
+
+        //Json without user and assert that the string can be deserialised
+        String notificationJson = "{\n"
+                + "  \"entities\": [\n"
+                + "    {\n"
+                + "      \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference\",\n"
+                + "      \"id\": {\n"
+                + "        \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Id\",\n"
+                + "        \"id\": \"-1457685864305243000\",\n"
+                + "        \"version\": 0,\n"
+                + "        \"typeName\": \"sometype\"\n"
+                + "      },\n"
+                + "      \"typeName\": \"sometype\",\n"
+                + "      \"values\": {\n"
+                + "        \"attr\": \"value\"\n"
+                + "      },\n"
+                + "      \"traitNames\": [],\n"
+                + "      \"traits\": {}\n"
+                + "    }\n"
+                + "  ],\n"
+                + "  \"type\": \"ENTITY_CREATE\"\n"
+                + "}";
+
+        HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
+                notificationJson, HookNotification.HookNotificationMessage.class);
+        assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+        assertNull(actualNotification.user);
+        assertEquals(actualNotification.getUser(), HookNotification.HookNotificationMessage.UNKNOW_USER);
+    }
 }



Mime
View raw message