atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [2/3] incubator-atlas git commit: ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
Date Thu, 31 Mar 2016 10:51:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5be00f5..c6d82aa 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1474,6 +1474,8 @@
                         <user.dir>${project.basedir}</user.dir>
                         <atlas.data>${project.build.directory}/data</atlas.data>
                         <log4j.configuration>atlas-log4j.xml</log4j.configuration>
+                        <zookeeper.client.secure>false</zookeeper.client.secure>
+                        <zookeeper.sasl.client>false</zookeeper.sasl.client>
                     </systemProperties>
                     <skipTests>${skipTests}</skipTests>
                     <forkMode>always</forkMode>
@@ -1483,9 +1485,6 @@
                         -Xmx1024m -XX:MaxPermSize=512m -Djava.net.preferIPv4Stack=true
                     </argLine>
                     <skip>${skipUTs}</skip>
-                    <excludes>
-                        <exclude>**/*Base*</exclude>
-                    </excludes>
                 </configuration>
                 <dependencies>
                     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4569e55..aaef9e3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
 ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags)
 ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags)
 ATLAS-408 UI : Add a close link (x) on the top right when Tag is added (darshankumar89 via shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index 6502bba..eca087a 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -149,6 +149,7 @@
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-server</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 7651bc7..4a02b0d 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas;
 
+import com.google.inject.Binder;
 import com.google.inject.Singleton;
 import com.google.inject.matcher.Matchers;
 import com.google.inject.multibindings.Multibinder;
@@ -27,21 +28,26 @@ import org.aopalliance.intercept.MethodInterceptor;
 import org.apache.atlas.discovery.DiscoveryService;
 import org.apache.atlas.discovery.HiveLineageService;
 import org.apache.atlas.discovery.LineageService;
-import org.apache.atlas.discovery.SearchIndexer;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.listener.TypesChangeListener;
 import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.audit.EntityAuditListener;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
 import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphProvider;
 import org.apache.atlas.repository.graph.TitanGraphProvider;
 import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
 import org.apache.atlas.repository.typestore.ITypeStore;
+import org.apache.atlas.service.Service;
 import org.apache.atlas.services.DefaultMetadataService;
 import org.apache.atlas.services.IBootstrapTypesRegistrar;
 import org.apache.atlas.services.MetadataService;
 import org.apache.atlas.services.ReservedTypesRegistrar;
 import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeSystemProvider;
 
 /**
  * Guice module for Repository module.
@@ -51,9 +57,6 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
     @Override
     protected void configure() {
         // special wiring for Titan Graph
-
-
-
         ThrowingProviderBinder.create(binder()).bind(GraphProvider.class, TitanGraph.class).to(TitanGraphProvider.class)
                 .asEagerSingleton();
 
@@ -61,7 +64,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
         // bind the MetadataRepositoryService interface to an implementation
         bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class).asEagerSingleton();
 
-        bind(TypeSystem.class).in(Singleton.class);
+        bind(TypeSystem.class).toProvider(TypeSystemProvider.class).in(Singleton.class);
 
         // bind the ITypeStore interface to an implementation
         bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
@@ -80,9 +83,24 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
 
         bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
 
+        bindAuditRepository(binder());
+
+        //Add EntityAuditListener as EntityChangeListener
+        Multibinder<EntityChangeListener> entityChangeListenerBinder =
+                Multibinder.newSetBinder(binder(), EntityChangeListener.class);
+        entityChangeListenerBinder.addBinding().to(EntityAuditListener.class);
+
         MethodInterceptor interceptor = new GraphTransactionInterceptor();
         requestInjection(interceptor);
         bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor);
     }
 
+    protected void bindAuditRepository(Binder binder) {
+        //Map EntityAuditRepository interface to hbase based implementation
+        binder.bind(EntityAuditRepository.class).to(HBaseBasedAuditRepository.class).asEagerSingleton();
+
+        //Add HBaseBasedAuditRepository to service so that connection is closed at shutdown
+        Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
+        serviceBinder.addBinding().to(HBaseBasedAuditRepository.class);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
new file mode 100644
index 0000000..0c5c551
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository.
+ */
+public class EntityAuditListener implements EntityChangeListener {
+    private EntityAuditRepository auditRepository;
+
+    @Inject
+    public EntityAuditListener(EntityAuditRepository auditRepository) {
+        this.auditRepository = auditRepository;
+    }
+
+    @Override
+    public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+        List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        for (ITypedReferenceableInstance entity : entities) {
+            EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
+                    EntityAuditRepository.EntityAuditAction.ENTITY_CREATE,
+                    "Created: " + InstanceSerialization.toJson(entity, true));
+            events.add(event);
+        }
+        auditRepository.putEvents(events);
+    }
+
+    private EntityAuditRepository.EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts,
+                                                               EntityAuditRepository.EntityAuditAction action,
+                                                               String details) {
+        return new EntityAuditRepository.EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(),
+                action, details);
+    }
+
+    @Override
+    public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+
+    }
+
+    @Override
+    public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
+        EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+                EntityAuditRepository.EntityAuditAction.TAG_ADD,
+                "Added trait: " + InstanceSerialization.toJson(trait, true));
+        auditRepository.putEvents(event);
+    }
+
+    @Override
+    public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
+        EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+                EntityAuditRepository.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
+        auditRepository.putEvents(event);
+    }
+
+    @Override
+    public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+        List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        for (ITypedReferenceableInstance entity : entities) {
+            EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
+                    EntityAuditRepository.EntityAuditAction.ENTITY_DELETE, "Deleted entity");
+            events.add(event);
+        }
+        auditRepository.putEvents(events);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index a5b4a59..d41c4da 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -27,6 +27,10 @@ import java.util.List;
  * Interface for repository for storing entity audit events
  */
 public interface EntityAuditRepository {
+    enum EntityAuditAction {
+        ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE;
+    }
+
     /**
      * Structure of entity audit event
      */
@@ -34,13 +38,13 @@ public interface EntityAuditRepository {
         String entityId;
         Long timestamp;
         String user;
-        String action;
+        EntityAuditAction action;
         String details;
 
         public EntityAuditEvent() {
         }
 
-        public EntityAuditEvent(String entityId, long ts, String user, String action, String details) {
+        public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
             this.entityId = entityId;
             this.timestamp = ts;
             this.user = user;
@@ -61,7 +65,7 @@ public interface EntityAuditRepository {
             EntityAuditEvent otherEvent = (EntityAuditEvent) other;
             return StringUtils.equals(entityId, otherEvent.entityId) &&
                     (timestamp.longValue() == otherEvent.timestamp.longValue()) &&
-                    StringUtils.equals(user, otherEvent.user) && StringUtils.equals(action, otherEvent.action) &&
+                    StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
                     StringUtils.equals(details, otherEvent.details);
         }
 
@@ -77,6 +81,26 @@ public interface EntityAuditRepository {
                    .append(user).append(";Action=").append(action).append(";Details=").append(details);
             return builder.toString();
         }
+
+        public String getEntityId() {
+            return entityId;
+        }
+
+        public Long getTimestamp() {
+            return timestamp;
+        }
+
+        public String getUser() {
+            return user;
+        }
+
+        public EntityAuditAction getAction() {
+            return action;
+        }
+
+        public String getDetails() {
+            return details;
+        }
     }
 
     /**
@@ -87,6 +111,13 @@ public interface EntityAuditRepository {
     void putEvents(EntityAuditEvent... events) throws AtlasException;
 
     /**
+     * Add events to the event repository
+     * @param events events to be added
+     * @throws AtlasException
+     */
+    void putEvents(List<EntityAuditEvent> events) throws AtlasException;
+
+    /**
      * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
      * @param entityId entity id
      * @param ts starting timestamp for events

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 8b92792..ae6e988 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
@@ -80,16 +81,29 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
      * @param events events to be added
      * @throws AtlasException
      */
+    @Override
     public void putEvents(EntityAuditRepository.EntityAuditEvent... events) throws AtlasException {
-        LOG.info("Putting {} events", events.length);
+        putEvents(Arrays.asList(events));
+    }
+
+    @Override
+    /**
+     * Add events to the event repository
+     * @param events events to be added
+     * @throws AtlasException
+     */
+    public void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+        LOG.info("Putting {} events", events.size());
         Table table = null;
         try {
             table = connection.getTable(tableName);
-            List<Put> puts = new ArrayList<>(events.length);
+            List<Put> puts = new ArrayList<>(events.size());
             for (EntityAuditRepository.EntityAuditEvent event : events) {
                 LOG.debug("Adding entity audit event {}", event);
                 Put put = new Put(getKey(event.entityId, event.timestamp));
-                addColumn(put, COLUMN_ACTION, event.action);
+                if (event.action != null) {
+                    put.addColumn(COLUMN_FAMILY, COLUMN_ACTION, Bytes.toBytes((short)event.action.ordinal()));
+                }
                 addColumn(put, COLUMN_USER, event.user);
                 addColumn(put, COLUMN_DETAIL, event.details);
                 puts.add(put);
@@ -145,7 +159,8 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
                 String key = Bytes.toString(result.getRow());
                 EntityAuditRepository.EntityAuditEvent event = fromKey(key);
                 event.user = getResultString(result, COLUMN_USER);
-                event.action = getResultString(result, COLUMN_ACTION);
+                event.action =
+                        EntityAuditAction.values()[(Bytes.toShort(result.getValue(COLUMN_FAMILY, COLUMN_ACTION)))];
                 event.details = getResultString(result, COLUMN_DETAIL);
                 events.add(event);
             }
@@ -189,7 +204,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
      * @throws AtlasException
      * @param atlasConf
      */
-    public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
+    public static org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
         Configuration subsetAtlasConf =
                 ApplicationProperties.getSubsetConfiguration(atlasConf, CONFIG_PREFIX);
         org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
new file mode 100644
index 0000000..df75290
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.AtlasException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Entity audit repository where audit events are stored in-memory. Used only for integration tests
+ */
+public class InMemoryEntityAuditRepository implements EntityAuditRepository {
+    private TreeMap<String, EntityAuditEvent> auditEvents = new TreeMap<>();
+
+    @Override
+    public void putEvents(EntityAuditEvent... events) throws AtlasException {
+        putEvents(Arrays.asList(events));
+    }
+
+    @Override
+    public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+        for (EntityAuditEvent event : events) {
+            auditEvents.put(event.entityId + (Long.MAX_VALUE - event.timestamp), event);
+        }
+    }
+
+    @Override
+    public List<EntityAuditEvent> listEvents(String entityId, Long ts, short maxResults)
+            throws AtlasException {
+        List<EntityAuditEvent> events = new ArrayList<>();
+        SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(entityId + (Long.MAX_VALUE - ts));
+        for (EntityAuditEvent event : subMap.values()) {
+            if (events.size() < maxResults && event.entityId.equals(entityId)) {
+                events.add(event);
+            }
+        }
+        return events;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 3ea5fde..7eccc58 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -59,8 +59,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
 
     private final TitanGraph titanGraph;
 
-    private TitanManagement management;
-
     List<Class> MIXED_INDEX_EXCLUSIONS = new ArrayList() {{
             add(Boolean.class);
             add(BigDecimal.class);
@@ -68,57 +66,63 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
         }};
 
     @Inject
-    public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException {
+    public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException,
+            IndexException {
 
         this.titanGraph = graphProvider.get();
 
         /* Create the transaction for indexing.
          */
-        management = titanGraph.getManagementSystem();
         initialize();
     }
 
     /**
      * Initializes the indices for the graph - create indices for Global Vertex Keys
      */
-    private void initialize() {
-        if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
-            LOG.info("Global indexes already exist for graph");
-            return;
-        }
+    private void initialize() throws RepositoryException, IndexException {
+        TitanManagement management = titanGraph.getManagementSystem();
+        try {
+            if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
+                LOG.info("Global indexes already exist for graph");
+                return;
+            }
 
         /* This is called only once, which is the first time Atlas types are made indexable .*/
-        LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
-        management.buildIndex(Constants.VERTEX_INDEX, Vertex.class).buildMixedIndex(Constants.BACKING_INDEX);
-        management.buildIndex(Constants.EDGE_INDEX, Edge.class).buildMixedIndex(Constants.BACKING_INDEX);
+            LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
+            management.buildIndex(Constants.VERTEX_INDEX, Vertex.class).buildMixedIndex(Constants.BACKING_INDEX);
+            management.buildIndex(Constants.EDGE_INDEX, Edge.class).buildMixedIndex(Constants.BACKING_INDEX);
 
-        // create a composite index for guid as its unique
-        createCompositeAndMixedIndex(Constants.GUID_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
+            // create a composite index for guid as its unique
+            createCompositeAndMixedIndex(management, Constants.GUID_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
 
-        // create a composite and mixed index for type since it can be combined with other keys
-        createCompositeAndMixedIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
+            // create a composite and mixed index for type since it can be combined with other keys
+            createCompositeAndMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE,
+                    true);
 
-        // create a composite and mixed index for type since it can be combined with other keys
-        createCompositeAndMixedIndex(Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, Cardinality.SET, true);
+            // create a composite and mixed index for type since it can be combined with other keys
+            createCompositeAndMixedIndex(management, Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, Cardinality.SET,
+                    true);
 
-        // create a composite and mixed index for traitNames since it can be combined with other
-        // keys. Traits must be a set and not a list.
-        createCompositeAndMixedIndex(Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, Cardinality.SET, true);
+            // create a composite and mixed index for traitNames since it can be combined with other
+            // keys. Traits must be a set and not a list.
+            createCompositeAndMixedIndex(management, Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, Cardinality.SET,
+                    true);
 
-        // Index for full text search
-        createFullTextIndex();
+            // Index for full text search
+            createFullTextIndex(management);
 
-        //Indexes for graph backed type system store
-        createTypeStoreIndexes();
+            //Indexes for graph backed type system store
+            createTypeStoreIndexes(management);
 
-        management.commit();
-        //Make sure we acquire another transaction after commit for subsequent indexing
-        management = titanGraph.getManagementSystem();
-
-        LOG.info("Index creation for global keys complete.");
+            commit(management);
+            LOG.info("Index creation for global keys complete.");
+        } catch (Throwable t) {
+            rollback(management);
+            throw new RepositoryException(t);
+        }
     }
 
-    private void createFullTextIndex() {
+    private void createFullTextIndex(TitanManagement management) {
         PropertyKey fullText =
                 management.makePropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY).dataType(String.class).make();
 
@@ -128,12 +132,14 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
         LOG.info("Created mixed index for {}", Constants.ENTITY_TEXT_PROPERTY_KEY);
     }
 
-    private void createTypeStoreIndexes() {
+    private void createTypeStoreIndexes(TitanManagement management) {
         //Create unique index on typeName
-        createCompositeAndMixedIndex(Constants.TYPENAME_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
+        createCompositeAndMixedIndex(management, Constants.TYPENAME_PROPERTY_KEY, String.class, true,
+                Cardinality.SINGLE, true);
 
         //create index on vertex type
-        createCompositeAndMixedIndex(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
+        createCompositeAndMixedIndex(management, Constants.VERTEX_TYPE_PROPERTY_KEY, String.class, false,
+                Cardinality.SINGLE, true);
     }
 
     /**
@@ -144,21 +150,22 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
      */
     @Override
     public void onAdd(Collection<? extends IDataType> dataTypes) throws AtlasException {
-
+        TitanManagement management = titanGraph.getManagementSystem();
         for (IDataType dataType : dataTypes) {
             LOG.info("Creating indexes for type name={}, definition={}", dataType.getName(), dataType.getClass());
             try {
-                addIndexForType(dataType);
+                addIndexForType(management, dataType);
                 LOG.info("Index creation for type {} complete", dataType.getName());
             } catch (Throwable throwable) {
                 LOG.error("Error creating index for type {}", dataType, throwable);
                 //Rollback indexes if any failure
-                rollback();
+                rollback(management);
                 throw new IndexCreationException("Error while creating index for type " + dataType, throwable);
             }
         }
+
         //Commit indexes
-        commit();
+        commit(management);
     }
 
     @Override
@@ -166,7 +173,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
         onAdd(dataTypes);
     }
 
-    private void addIndexForType(IDataType dataType) {
+    private void addIndexForType(TitanManagement management, IDataType dataType) {
         switch (dataType.getTypeCategory()) {
         case PRIMITIVE:
         case ENUM:
@@ -178,17 +185,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
 
         case STRUCT:
             StructType structType = (StructType) dataType;
-            createIndexForFields(structType, structType.fieldMapping().fields);
+            createIndexForFields(management, structType, structType.fieldMapping().fields);
             break;
 
         case TRAIT:
             TraitType traitType = (TraitType) dataType;
-            createIndexForFields(traitType, traitType.fieldMapping().fields);
+            createIndexForFields(management, traitType, traitType.fieldMapping().fields);
             break;
 
         case CLASS:
             ClassType classType = (ClassType) dataType;
-            createIndexForFields(classType, classType.fieldMapping().fields);
+            createIndexForFields(management, classType, classType.fieldMapping().fields);
             break;
 
         default:
@@ -196,26 +203,26 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
         }
     }
 
-    private void createIndexForFields(IDataType dataType, Map<String, AttributeInfo> fields) {
+    private void createIndexForFields(TitanManagement management, IDataType dataType, Map<String, AttributeInfo> fields) {
         for (AttributeInfo field : fields.values()) {
             if (field.isIndexable) {
-                createIndexForAttribute(dataType.getName(), field);
+                createIndexForAttribute(management, dataType.getName(), field);
             }
         }
     }
 
-    private void createIndexForAttribute(String typeName, AttributeInfo field) {
+    private void createIndexForAttribute(TitanManagement management, String typeName, AttributeInfo field) {
         final String propertyName = typeName + "." + field.name;
         switch (field.dataType().getTypeCategory()) {
         case PRIMITIVE:
             Cardinality cardinality = getCardinality(field.multiplicity);
-            createCompositeAndMixedIndex(propertyName, getPrimitiveClass(field.dataType()), field.isUnique,
+            createCompositeAndMixedIndex(management, propertyName, getPrimitiveClass(field.dataType()), field.isUnique,
                     cardinality, false);
             break;
 
         case ENUM:
             cardinality = getCardinality(field.multiplicity);
-            createCompositeAndMixedIndex(propertyName, String.class, field.isUnique, cardinality, false);
+            createCompositeAndMixedIndex(management, propertyName, String.class, field.isUnique, cardinality, false);
             break;
 
         case ARRAY:
@@ -226,7 +233,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
 
         case STRUCT:
             StructType structType = (StructType) field.dataType();
-            createIndexForFields(structType, structType.fieldMapping().fields);
+            createIndexForFields(management, structType, structType.fieldMapping().fields);
             break;
 
         case TRAIT:
@@ -289,8 +296,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
     }
 
 
-    private PropertyKey createCompositeAndMixedIndex(String propertyName, Class propertyClass,
-            boolean isUnique, Cardinality cardinality, boolean force) {
+    private PropertyKey createCompositeAndMixedIndex(TitanManagement management, String propertyName,
+                                                     Class propertyClass,
+                                                     boolean isUnique, Cardinality cardinality, boolean force) {
 
         PropertyKey propertyKey = management.getPropertyKey(propertyName);
         if (propertyKey == null) {
@@ -329,7 +337,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
                 Cardinality.SET);
     }
 
-    public void commit() throws IndexException {
+    public void commit(TitanManagement management) throws IndexException {
         try {
             management.commit();
         } catch (Exception e) {
@@ -338,7 +346,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
         }
     }
 
-    public void rollback() throws IndexException {
+    public void rollback(TitanManagement management) throws IndexException {
         try {
             management.rollback();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index e326f27..40728bc 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -22,13 +22,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Provider;
-
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.classification.InterfaceAudience;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.listener.TypesChangeListener;
-import org.apache.atlas.repository.IndexCreationException;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.typestore.ITypeStore;
@@ -68,11 +66,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
-
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -86,32 +81,44 @@ public class DefaultMetadataService implements MetadataService {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
 
-    private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
-
     private final TypeSystem typeSystem;
     private final MetadataRepository repository;
     private final ITypeStore typeStore;
     private IBootstrapTypesRegistrar typesRegistrar;
-    private final Collection<Provider<TypesChangeListener>> typeChangeListeners;
+
+    private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
+    private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
 
     @Inject
     DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
                            final IBootstrapTypesRegistrar typesRegistrar,
-        final Collection<Provider<TypesChangeListener>> typeChangeListeners) throws AtlasException {
-        this(repository, typeStore, typesRegistrar, typeChangeListeners, TypeSystem.getInstance());
+                           final Collection<Provider<TypesChangeListener>> typeListenerProviders,
+                           final Collection<Provider<EntityChangeListener>> entityListenerProviders)
+            throws AtlasException {
+        this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
+                TypeSystem.getInstance());
     }
 
     DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
                            final IBootstrapTypesRegistrar typesRegistrar,
-                           final Collection<Provider<TypesChangeListener>> typeChangeListeners,
+                           final Collection<Provider<TypesChangeListener>> typeListenerProviders,
+                           final Collection<Provider<EntityChangeListener>> entityListenerProviders,
                            final TypeSystem typeSystem) throws AtlasException {
         this.typeStore = typeStore;
         this.typesRegistrar = typesRegistrar;
         this.typeSystem = typeSystem;
         this.repository = repository;
 
-        this.typeChangeListeners = typeChangeListeners;
+        for (Provider<TypesChangeListener> provider : typeListenerProviders) {
+            typeChangeListeners.add(provider.get());
+        }
+
+        for (Provider<EntityChangeListener> provider : entityListenerProviders) {
+            entityChangeListeners.add(provider.get());
+        }
+
         restoreTypeSystem();
+
         typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
     }
 
@@ -604,19 +611,8 @@ public class DefaultMetadataService implements MetadataService {
     }
 
     private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
-        Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
-        for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
-            final TypesChangeListener listener = indexerProvider.get();
-            try {
-                listener.onAdd(typesAdded.values());
-            } catch (IndexCreationException ice) {
-                LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
-                caughtExceptions.put(listener, ice);
-            }
-        }
-
-        if (caughtExceptions.size() > 0) {
-            throw new IndexCreationException("Index creation failed for types " + typesAdded.keySet() + ". Aborting");
+        for (TypesChangeListener listener : typeChangeListeners) {
+            listener.onAdd(typesAdded.values());
         }
     }
 
@@ -637,19 +633,8 @@ public class DefaultMetadataService implements MetadataService {
     }
 
     private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
-        Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
-        for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
-            final TypesChangeListener listener = indexerProvider.get();
-            try {
-                listener.onChange(typesUpdated.values());
-            } catch (IndexCreationException ice) {
-                LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
-                caughtExceptions.put(listener, ice);
-            }
-        }
-
-        if (caughtExceptions.size() > 0) {
-            throw new IndexCreationException("Index creation failed for types " + typesUpdated.keySet() + ". Aborting");
+        for (TypesChangeListener listener : typeChangeListeners) {
+            listener.onChange(typesUpdated.values());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
index 5b74dc8..b4a9cb2 100755
--- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
@@ -19,15 +19,12 @@
 package org.apache.atlas.discovery;
 
 import com.google.common.collect.ImmutableSet;
-import com.thinkaurelius.titan.core.TitanGraph;
-
 import org.apache.atlas.BaseHiveRepositoryTest;
 import org.apache.atlas.RepositoryMetadataModule;
 import org.apache.atlas.TestUtils;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphProvider;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.persistence.Id;
@@ -46,7 +43,6 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import javax.inject.Inject;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -60,9 +56,6 @@ import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAt
 public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest {
 
     @Inject
-    private GraphProvider<TitanGraph> graphProvider;
-
-    @Inject
     private MetadataRepository repositoryService;
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
new file mode 100644
index 0000000..9c193f7
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+public class AuditRepositoryTestBase {
+    protected EntityAuditRepository eventRepository;
+
+    private String rand() {
+        return RandomStringUtils.randomAlphanumeric(10);
+    }
+
+    @Test
+    public void testAddEvents() throws Exception {
+        EntityAuditRepository.EntityAuditEvent event =
+                new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
+                        EntityAuditRepository.EntityAuditAction.ENTITY_CREATE, "d1");
+
+        eventRepository.putEvents(event);
+
+        List<EntityAuditRepository.EntityAuditEvent> events =
+                eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
+        assertEquals(events.size(), 1);
+        assertEquals(events.get(0), event);
+    }
+
+    @Test
+    public void testListPagination() throws Exception {
+        String id1 = "id1" + rand();
+        String id2 = "id2" + rand();
+        String id3 = "id3" + rand();
+        long ts = System.currentTimeMillis();
+        List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
+        for (int i = 0; i < 3; i++) {
+            //Add events for both ids
+            EntityAuditRepository.EntityAuditEvent event =
+                    new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i,
+                            EntityAuditRepository.EntityAuditAction.ENTITY_UPDATE, "details" + i);
+            eventRepository.putEvents(event);
+            expectedEvents.add(event);
+            eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
+                    EntityAuditRepository.EntityAuditAction.TAG_DELETE, "details" + i));
+            eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
+                    EntityAuditRepository.EntityAuditAction.TAG_ADD, "details" + i));
+        }
+
+        //Use ts for which there is no event - ts + 2
+        List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
+        assertEquals(events.size(), 2);
+        assertEquals(events.get(0), expectedEvents.get(0));
+        assertEquals(events.get(1), expectedEvents.get(1));
+
+        //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
+        events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
+        assertEquals(events.size(), 1);
+        assertEquals(events.get(0), expectedEvents.get(2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
index ac52f29..677eb39 100644
--- a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
@@ -19,45 +19,24 @@
 package org.apache.atlas.repository.audit;
 
 import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.LocalHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-public class HBaseBasedAuditRepositoryTest {
-    private HBaseTestingUtility testUtility;
-    private HBaseBasedAuditRepository eventRepository;
-    private LocalHBaseCluster hbaseCluster;
+public class HBaseBasedAuditRepositoryTest extends AuditRepositoryTestBase {
     private TableName tableName;
 
     @BeforeClass
     public void setup() throws Exception {
-        testUtility = HBaseTestingUtility.createLocalHTU();
-        testUtility.startMiniZKCluster();
-        testUtility.getConfiguration().set("zookeeper.session.timeout.ms", "1000");
-        hbaseCluster = new LocalHBaseCluster(testUtility.getConfiguration());
-        hbaseCluster.startup();
-
-        eventRepository = new HBaseBasedAuditRepository() {
-            @Override
-            public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf)
-                    throws AtlasException {
-                return testUtility.getConfiguration();
-            }
-        };
-        eventRepository.start();
+        eventRepository = new HBaseBasedAuditRepository();
+        HBaseTestUtils.startCluster();
+        ((HBaseBasedAuditRepository)eventRepository).start();
 
         Configuration properties = ApplicationProperties.get();
         String tableNameStr = properties.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
@@ -67,63 +46,14 @@ public class HBaseBasedAuditRepositoryTest {
 
     @AfterClass
     public void teardown() throws Exception {
-        eventRepository.stop();
-        testUtility.getConnection().close();
-        hbaseCluster.shutdown();
-        testUtility.shutdownMiniZKCluster();
-    }
-
-    private String rand() {
-        return RandomStringUtils.randomAlphanumeric(10);
+        ((HBaseBasedAuditRepository)eventRepository).stop();
+        HBaseTestUtils.stopCluster();
     }
 
     @Test
     public void testTableCreated() throws Exception {
-        Admin admin = testUtility.getConnection().getAdmin();
+        Connection connection = HBaseTestUtils.getConnection();
+        Admin admin = connection.getAdmin();
         assertTrue(admin.tableExists(tableName));
     }
-
-    @Test
-    public void testAddEvents() throws Exception {
-        EntityAuditRepository.EntityAuditEvent event =
-                new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", "a1", "d1");
-
-        eventRepository.putEvents(event);
-
-        List<EntityAuditRepository.EntityAuditEvent> events =
-                eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
-        assertEquals(events.size(), 1);
-        assertEquals(events.get(0), event);
-    }
-
-    @Test
-    public void testListPagination() throws Exception {
-        String id1 = "id1" + rand();
-        String id2 = "id2" + rand();
-        String id3 = "id3" + rand();
-        long ts = System.nanoTime();
-        List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
-        for (int i = 0; i < 3; i++) {
-            //Add events for both ids
-            EntityAuditRepository.EntityAuditEvent event =
-                    new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i, "action" + i, "details" + i);
-            eventRepository.putEvents(event);
-            expectedEvents.add(event);
-            eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
-                    "action" + i, "details" + i));
-            eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
-                    "action" + i, "details" + i));
-        }
-
-        //Use ts for which there is no event - ts + 2
-        List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
-        assertEquals(events.size(), 2);
-        assertEquals(events.get(0), expectedEvents.get(0));
-        assertEquals(events.get(1), expectedEvents.get(1));
-
-        //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
-        events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
-        assertEquals(events.size(), 1);
-        assertEquals(events.get(0), expectedEvents.get(2));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
new file mode 100644
index 0000000..0e43806
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.RequestContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.client.Connection;
+
+import java.io.IOException;
+
+public class HBaseTestUtils {
+    private static HBaseTestingUtility hbaseTestUtility;
+    private static LocalHBaseCluster hbaseCluster;
+
+    public static void startCluster() throws Exception {
+        Configuration hbaseConf =
+                HBaseBasedAuditRepository.getHBaseConfiguration(ApplicationProperties.get());
+        hbaseTestUtility = new HBaseTestingUtility(hbaseConf);
+        int zkPort = hbaseConf.getInt("hbase.zookeeper.property.clientPort", 19026);
+        hbaseTestUtility.startMiniZKCluster(1, zkPort);
+
+        hbaseCluster = new LocalHBaseCluster(hbaseTestUtility.getConfiguration());
+        hbaseCluster.startup();
+
+        RequestContext.createContext();
+        RequestContext.get().setUser("testuser");
+    }
+
+    public static void stopCluster() throws Exception {
+        hbaseTestUtility.getConnection().close();
+        hbaseCluster.shutdown();
+        hbaseTestUtility.shutdownMiniZKCluster();
+    }
+
+    public static Connection getConnection() throws IOException {
+        return hbaseTestUtility.getConnection();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
new file mode 100644
index 0000000..3bdfcf9
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.testng.annotations.BeforeClass;
+
+public class InMemoryAuditRepositoryTest extends AuditRepositoryTestBase {
+    @BeforeClass
+    public void setup() {
+        eventRepository = new InMemoryEntityAuditRepository();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index 0b01230..5ac0e43 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -25,6 +25,9 @@ import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.util.TitanCleanup;
 
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
+import org.apache.atlas.repository.audit.HBaseTestUtils;
 import org.apache.atlas.typesystem.exception.TypeNotFoundException;
 import org.apache.atlas.typesystem.exception.EntityNotFoundException;
 import org.apache.atlas.typesystem.types.ClassType;
@@ -71,14 +74,19 @@ import java.util.Map;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 @Guice(modules = RepositoryMetadataModule.class)
 public class DefaultMetadataServiceTest {
     @Inject
     private MetadataService metadataService;
+
     @Inject
     private GraphProvider<TitanGraph> graphProvider;
 
+    @Inject
+    private EntityAuditRepository repository;
+
     private Referenceable db = createDBEntity();
 
     private Id dbId;
@@ -90,6 +98,11 @@ public class DefaultMetadataServiceTest {
 
     @BeforeTest
     public void setUp() throws Exception {
+        if (repository instanceof HBaseBasedAuditRepository) {
+            HBaseTestUtils.startCluster();
+            ((HBaseBasedAuditRepository) repository).start();
+        }
+
         TypesDef typesDef = TestUtils.defineHiveTypes();
         try {
             metadataService.getTypeDefinition(TestUtils.TABLE_TYPE);
@@ -109,7 +122,7 @@ public class DefaultMetadataServiceTest {
     }
 
     @AfterTest
-    public void shutdown() {
+    public void shutdown() throws Exception {
         TypeSystem.getInstance().reset();
         try {
             //TODO - Fix failure during shutdown while using BDB
@@ -122,6 +135,11 @@ public class DefaultMetadataServiceTest {
         } catch(Exception e) {
             e.printStackTrace();
         }
+
+        if (repository instanceof HBaseBasedAuditRepository) {
+            ((HBaseBasedAuditRepository) repository).stop();
+            HBaseTestUtils.stopCluster();
+        }
     }
 
     private String createInstance(Referenceable entity) throws Exception {
@@ -172,6 +190,7 @@ public class DefaultMetadataServiceTest {
         entity.set("type", "VARCHAR(32)");
         return entity;
     }
+
     @Test(expectedExceptions = TypeNotFoundException.class)
     public void testCreateEntityWithUnknownDatatype() throws Exception {
         Referenceable entity = new Referenceable("Unknown datatype");
@@ -179,7 +198,7 @@ public class DefaultMetadataServiceTest {
         entity.set("name", dbName);
         entity.set("description", "us db");
         createInstance(entity);
-        Assert.fail(TypeNotFoundException.class.getSimpleName() +" was expected but none thrown.");
+        Assert.fail(TypeNotFoundException.class.getSimpleName() + " was expected but none thrown.");
     }
 
     @Test
@@ -187,6 +206,7 @@ public class DefaultMetadataServiceTest {
         //name is the unique attribute
         Referenceable entity = createDBEntity();
         String id = createInstance(entity);
+        assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
 
         //using the same name should succeed, but not create another entity
         String newId = createInstance(entity);
@@ -199,6 +219,35 @@ public class DefaultMetadataServiceTest {
     }
 
     @Test
+    public void testEntityAudit() throws Exception {
+        //create entity
+        Referenceable entity = createDBEntity();
+        String id = createInstance(entity);
+        assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
+
+        Struct tag = new Struct(TestUtils.PII);
+        metadataService.addTrait(id, InstanceSerialization.toJson(tag, true));
+        assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_ADD);
+
+        metadataService.deleteTrait(id, TestUtils.PII);
+        assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_DELETE);
+
+        metadataService.deleteEntities(Arrays.asList(id));
+        assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_DELETE);
+    }
+
+    private void assertAuditEvents(String id, EntityAuditRepository.EntityAuditAction action) throws Exception {
+        List<EntityAuditRepository.EntityAuditEvent> events =
+                repository.listEvents(id, System.currentTimeMillis(), (short) 10);
+        for (EntityAuditRepository.EntityAuditEvent event : events) {
+            if (event.getAction() == action) {
+                return;
+            }
+        }
+        fail("Didn't find " + action + " in audit events");
+    }
+
+    @Test
     public void testCreateEntityWithUniqueAttributeWithReference() throws Exception {
         Referenceable db = createDBEntity();
         String dbId = createInstance(db);
@@ -468,7 +517,7 @@ public class DefaultMetadataServiceTest {
         tableDefinitionJson =
             metadataService.getEntityDefinition(tableId._getId());
         tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
-        Assert.assertNull(((Struct)tableDefinition.get("serde1")).get("description"));
+        Assert.assertNull(((Struct) tableDefinition.get("serde1")).get("description"));
     }
 
 
@@ -718,8 +767,6 @@ public class DefaultMetadataServiceTest {
     
     @Test
     public void testDeleteEntities() throws Exception {
-        
-        
         // Create 2 table entities, each with 3 composite column entities
         Referenceable dbEntity = createDBEntity();
         String dbGuid = createInstance(dbEntity);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
index 84ec761..0685e19 100644
--- a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
@@ -20,18 +20,16 @@ package org.apache.atlas.services;
 
 import com.google.inject.Provider;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.listener.TypesChangeListener;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.typestore.ITypeStore;
-import org.apache.atlas.typesystem.TypesDef;
 import org.apache.atlas.typesystem.types.TypeSystem;
-import org.mockito.Matchers;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -45,7 +43,8 @@ public class DefaultMetadataServiceMockTest {
         when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
         DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class),
                 mock(ITypeStore.class),
-                typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), typeSystem);
+                typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
+                new ArrayList<Provider<EntityChangeListener>>(), typeSystem);
 
         verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
                 typeSystem, defaultMetadataService);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/server-api/pom.xml
----------------------------------------------------------------------
diff --git a/server-api/pom.xml b/server-api/pom.xml
index 8b4753a..93a0358 100644
--- a/server-api/pom.xml
+++ b/server-api/pom.xml
@@ -47,7 +47,6 @@
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-typesystem</artifactId>
         </dependency>
-
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/server-api/src/main/java/org/apache/atlas/RequestContext.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
new file mode 100644
index 0000000..943e4b8
--- /dev/null
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RequestContext {
+    private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
+
+    private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
+
+    private String user;
+
+    private RequestContext() {
+    }
+
+    public static RequestContext get() {
+        return CURRENT_CONTEXT.get();
+    }
+
+    public static RequestContext createContext() {
+        RequestContext context = new RequestContext();
+        CURRENT_CONTEXT.set(context);
+        return context;
+    }
+
+    public static void clear() {
+        CURRENT_CONTEXT.remove();
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
index 9e4aa79..b41f3db 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
-
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.classification.InterfaceAudience;
 import org.apache.atlas.typesystem.TypesDef;
@@ -30,7 +29,6 @@ import org.apache.atlas.typesystem.exception.TypeExistsException;
 import org.apache.atlas.typesystem.exception.TypeNotFoundException;
 
 import javax.inject.Singleton;
-
 import java.lang.reflect.Constructor;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
new file mode 100644
index 0000000..4e1cd36
--- /dev/null
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.typesystem.types;
+
+import com.google.inject.Provider;
+
+public class TypeSystemProvider implements Provider<TypeSystem> {
+    @Override
+    public TypeSystem get() {
+        return TypeSystem.getInstance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties
index 239ac95..9a32e04 100644
--- a/typesystem/src/main/resources/atlas-application.properties
+++ b/typesystem/src/main/resources/atlas-application.properties
@@ -71,6 +71,12 @@ atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities
 
+#########  Entity Audit Configs  #########
+atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
+atlas.audit.zookeeper.session.timeout.ms=1000
+atlas.audit.hbase.zookeeper.quorum=localhost
+atlas.audit.hbase.zookeeper.property.clientPort=19026
+
 #########  Security Properties  #########
 
 # SSL config
@@ -80,3 +86,5 @@ atlas.server.https.port=31443
 #########  Security Properties  #########
 
 hbase.security.authentication=simple
+
+atlas.hook.falcon.synchronous=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 98be234..85c9471 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -342,10 +342,10 @@
                     </httpConnector>
                     <war>${project.build.directory}/atlas-webapp-${project.version}.war</war>
                     <daemon>true</daemon>
-                    <!--<webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>-->
+                    <webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>
                     <webApp>
                         <contextPath>/</contextPath>
-                        <descriptor>webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+                        <descriptor>${project.basedir}/src/test/webapp/WEB-INF/web.xml</descriptor>
                         <!-- ${project.build.directory}/atlas-webapp-${project.version} -->
                         <extraClasspath>${project.build.directory}/../../webapp/target/test-classes/</extraClasspath>
                     </webApp>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
index ae37314..01b1cd3 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
@@ -20,22 +20,32 @@ package org.apache.atlas.web.filters;
 
 import com.google.inject.Singleton;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.security.SecurityProperties;
+import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.log4j.NDC;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
 import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Enumeration;
-import java.util.Iterator;
 import java.util.Properties;
 
 /**
@@ -47,6 +57,27 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasAuthenticationFilter.class);
     static final String PREFIX = "atlas.http.authentication";
 
+    /**
+     * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
+     * before invoking the actual resource.
+     */
+    private HttpServlet optionsServlet;
+
+    /**
+     * Initialize the filter.
+     *
+     * @param filterConfig filter configuration.
+     * @throws ServletException thrown if the filter could not be initialized.
+     */
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+        LOG.info("AtlasAuthenticationFilter initialization started");
+        super.init(filterConfig);
+
+        optionsServlet = new HttpServlet() {};
+        optionsServlet.init();
+    }
+
     @Override
     protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) throws ServletException {
         Configuration configuration;
@@ -94,4 +125,50 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
         return config;
     }
 
+    @Override
+    public void doFilter(final ServletRequest request, final ServletResponse response,
+                         final FilterChain filterChain) throws IOException, ServletException {
+
+        FilterChain filterChainWrapper = new FilterChain() {
+
+            @Override
+            public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
+                    throws IOException, ServletException {
+                HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+
+                if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+                    optionsServlet.service(request, response);
+                } else {
+                    final String user = Servlets.getUserFromRequest(httpRequest);
+                    if (StringUtils.isEmpty(user)) {
+                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                                "Param user.name can't be empty");
+                    } else {
+                        try {
+                            NDC.push(user + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
+                            RequestContext requestContext = RequestContext.get();
+                            requestContext.setUser(user);
+                            LOG.info("Request from authenticated user: {}, URL={}", user,
+                                    Servlets.getRequestURI(httpRequest));
+
+                            filterChain.doFilter(servletRequest, servletResponse);
+                        } finally {
+                            NDC.pop();
+                        }
+                    }
+                }
+            }
+        };
+
+        super.doFilter(request, response, filterChainWrapper);
+    }
+
+    @Override
+    public void destroy() {
+        if (optionsServlet != null) {
+            optionsServlet.destroy();
+        }
+
+        super.destroy();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index c735ecd..9d60e1a 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -20,6 +20,7 @@ package org.apache.atlas.web.filters;
 
 import com.google.inject.Singleton;
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.web.util.DateTimeHelper;
 import org.apache.atlas.web.util.Servlets;
 import org.slf4j.Logger;
@@ -60,15 +61,19 @@ public class AuditFilter implements Filter {
         final String requestId = UUID.randomUUID().toString();
         final Thread currentThread = Thread.currentThread();
         final String oldName = currentThread.getName();
+        String user = getUserFromRequest(httpRequest);
 
         try {
             currentThread.setName(formatName(oldName, requestId));
-            recordAudit(httpRequest, requestTimeISO9601);
+            RequestContext requestContext = RequestContext.createContext();
+            requestContext.setUser(user);
+            recordAudit(httpRequest, requestTimeISO9601, user);
             filterChain.doFilter(request, response);
         } finally {
             // put the request id into the response so users can trace logs for this request
             ((HttpServletResponse) response).setHeader(AtlasClient.REQUEST_ID, requestId);
             currentThread.setName(oldName);
+            RequestContext.clear();;
         }
     }
 
@@ -76,8 +81,7 @@ public class AuditFilter implements Filter {
         return oldName + " - " + requestId;
     }
 
-    private void recordAudit(HttpServletRequest httpRequest, String whenISO9601) {
-        final String who = getUserFromRequest(httpRequest);
+    private void recordAudit(HttpServletRequest httpRequest, String whenISO9601, String who) {
         final String fromHost = httpRequest.getRemoteHost();
         final String fromAddress = httpRequest.getRemoteAddr();
         final String whatRequest = httpRequest.getMethod();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index c1f6a9b..dac89d7 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -21,6 +21,7 @@ package org.apache.atlas.web.listeners;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
+import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.TypeLiteral;
 import com.google.inject.servlet.GuiceServletContextListener;
@@ -33,13 +34,9 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RepositoryMetadataModule;
-import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
 import org.apache.atlas.repository.graph.GraphProvider;
 import org.apache.atlas.service.Services;
-import org.apache.atlas.services.MetadataService;
-import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.commons.configuration.Configuration;
@@ -75,7 +72,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
             LoginProcessor loginProcessor = new LoginProcessor();
             loginProcessor.login();
 
-            injector = Guice.createInjector(new RepositoryMetadataModule(), new NotificationModule(),
+            injector = Guice.createInjector(getRepositoryModule(), new NotificationModule(),
                     new JerseyServletModule() {
                         @Override
                         protected void configureServlets() {
@@ -99,6 +96,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
                             try {
                                 Configuration configuration = ApplicationProperties.get();
                                 if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
+                                    LOG.info("Enabling AuthenticationFilter");
                                     filter("/*").through(AtlasAuthenticationFilter.class);
                                 }
                             } catch (AtlasException e) {
@@ -113,13 +111,16 @@ public class GuiceServletConfig extends GuiceServletContextListener {
         return injector;
     }
 
+    protected Module getRepositoryModule() {
+        return new RepositoryMetadataModule();
+    }
+
     @Override
     public void contextInitialized(ServletContextEvent servletContextEvent) {
         super.contextInitialized(servletContextEvent);
 
         installLogBridge();
 
-        initMetadataService();
         startServices();
     }
 
@@ -148,7 +149,12 @@ public class GuiceServletConfig extends GuiceServletContextListener {
             TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
             Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
             final Graph graph = graphProvider.get().get();
-            graph.shutdown();
+
+            try {
+                graph.shutdown();
+            } catch(Throwable t) {
+                LOG.warn("Error while shutting down graph", t);
+            }
 
             //stop services
             stopServices();
@@ -160,17 +166,4 @@ public class GuiceServletConfig extends GuiceServletContextListener {
         Services services = injector.getInstance(Services.class);
         services.stop();
     }
-
-    // initialize the metadata service
-    private void initMetadataService() {
-        MetadataService metadataService = injector.getInstance(MetadataService.class);
-
-        // add a listener for entity changes
-        NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class);
-
-        NotificationEntityChangeListener listener =
-            new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance());
-
-        metadataService.registerListener(listener);
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
index 871d857..2e75a61 100755
--- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
@@ -45,9 +45,14 @@ public class EmbeddedServer {
         Connector connector = getConnector(port);
         server.addConnector(connector);
 
+        WebAppContext application = getWebAppContext(path);
+        server.setHandler(application);
+    }
+
+    protected WebAppContext getWebAppContext(String path) {
         WebAppContext application = new WebAppContext(path, "/");
         application.setClassLoader(Thread.currentThread().getContextClassLoader());
-        server.setHandler(application);
+        return application;
     }
 
     public static EmbeddedServer newServer(int port, String path, boolean secure) throws IOException {


Mime
View raw message