atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject atlas git commit: ATLAS-1995: updated entity-lookup-by-unique-attributes to use indexQuery
Date Sat, 29 Jul 2017 01:35:43 GMT
Repository: atlas
Updated Branches:
  refs/heads/master d8b868339 -> 6fb2a0388


ATLAS-1995: updated entity-lookup-by-unique-attributes to use indexQuery

Signed-off-by: Madhan Neethiraj <madhan@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6fb2a038
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6fb2a038
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6fb2a038

Branch: refs/heads/master
Commit: 6fb2a0388a89369ceec57a74954d129df600e163
Parents: d8b8683
Author: ashutoshm <amestry@hortonworks.com>
Authored: Thu Jul 27 15:10:36 2017 -0700
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Fri Jul 28 18:18:19 2017 -0700

----------------------------------------------------------------------
 .../graph/GraphBackedMetadataRepository.java    |   1 -
 .../store/graph/AtlasTypeDefGraphStore.java     |   6 +
 .../store/graph/v1/AtlasEntityStoreV1.java      |   4 +-
 .../store/graph/v1/AtlasGraphUtilsV1.java       | 106 ++++++++++-
 .../GraphBackedMetadataRepositoryTest.java      |   3 +-
 .../graph/v1/AtlasDeleteHandlerV1Test.java      |   7 +-
 .../graph/v1/AtlasRelationshipStoreV1Test.java  |  10 +-
 .../notification/NotificationHookConsumer.java  | 175 ++++++++++---------
 8 files changed, 221 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 0f3b06b..50b7116 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -168,7 +168,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository
{
     }
 
     @Override
-    @GraphTransaction
     public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException,
EntityNotFoundException {
         return getEntityDefinitions(guid).get(0);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
index 517da68..3638e19 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
@@ -378,6 +378,12 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore,
Activ
             }
         }
 
+        try {
+            ttr.updateTypes(ret);
+        } catch (AtlasBaseException e) { // this shouldn't happen, as the types were already
validated
+            LOG.error("failed to update the registry after updating the store", e);
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, {}): {}", typesToCreate,
typesToUpdate, ret);
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index f340330..1c168b4 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -307,7 +307,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
         entity.setGuid(guid);
 
-        return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true);
+        return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false);
     }
 
     @Override
@@ -358,7 +358,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
                 throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED,
attrName, attrType.getTypeName());
         }
 
-        return createOrUpdate(new AtlasEntityStream(updateEntity), true);
+        return createOrUpdate(new AtlasEntityStream(updateEntity), true, false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 948d9dd..227f7cd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -18,7 +18,9 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.discovery.SearchProcessor;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -29,12 +31,14 @@ import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasElement;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Utility methods for Graph.
@@ -55,6 +60,19 @@ public class AtlasGraphUtilsV1 {
     public static final String VERTEX_TYPE          = "typeSystem";
     public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".relationshipType";
 
+    private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = false;
+
+    static {
+        try {
+            Configuration conf = ApplicationProperties.get();
+
+            USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = conf.getBoolean("atlas.use.index.query.to.find.entity.by.unique.attributes",
USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+        } catch (Exception excp) {
+            LOG.error("Error reading configuration", excp);
+        } finally {
+            LOG.info("atlas.use.index.query.to.find.entity.by.unique.attributes=" + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+        }
+    }
 
     public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) {
         return getTypeDefPropertyKey(typeDef.getName());
@@ -217,13 +235,22 @@ public class AtlasGraphUtilsV1 {
                     continue;
                 }
 
-                vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(),
attribute.getVertexPropertyName(), attrValue);
+                if (canUseIndexQuery(entityType, attribute.getName())) {
+                    vertex = AtlasGraphUtilsV1.getAtlasVertexFromIndexQuery(entityType, attribute,
attrValue);
+                } else {
+                    vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(),
attribute.getVertexPropertyName(), attrValue);
 
-                if (vertex == null) {
-                    vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(),
attribute.getVertexPropertyName(), attrValue);
+                    if (vertex == null) {
+                        vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(),
attribute.getVertexPropertyName(), attrValue);
+                    }
                 }
 
                 if (vertex != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("findByUniqueAttributes(type={}, attrName={}, attrValue={}:
found vertex {}",
+                                  entityType.getTypeName(), attribute.getName(), attrValue,
vertex);
+                    }
+
                     break;
                 }
             }
@@ -366,4 +393,77 @@ public class AtlasGraphUtilsV1 {
     public static String getStateAsString(AtlasElement element) {
         return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
     }
+
+    private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName)
{
+        boolean ret = false;
+
+        if (USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES) {
+            final String typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr();
+
+            ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES;
+
+            if (ret) {
+                Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys();
+                try {
+                    ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName));
+                }
+                catch (AtlasBaseException ex) {
+                    ret = false;
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private static AtlasVertex getAtlasVertexFromIndexQuery(AtlasEntityType entityType, AtlasAttribute
attribute, Object attrVal) {
+        String          propertyName = attribute.getVertexPropertyName();
+        AtlasIndexQuery query        = getIndexQuery(entityType, propertyName, attrVal.toString());
+
+        for (Iterator<AtlasIndexQuery.Result> iter = query.vertices(); iter.hasNext();
) {
+            AtlasIndexQuery.Result result = iter.next();
+            AtlasVertex            vertex = result.getVertex();
+
+            // skip non-entity vertices, if any got returned
+            if (vertex == null || !vertex.getPropertyKeys().contains(Constants.GUID_PROPERTY_KEY))
{
+                continue;
+            }
+
+            // verify the typeName
+            String typeNameInVertex = getTypeName(vertex);
+
+            if (!entityType.getTypeAndAllSubTypes().contains(typeNameInVertex)) {
+                LOG.warn("incorrect vertex type from index-query: expected='{}'; found='{}'",
entityType.getTypeName(), typeNameInVertex);
+
+                continue;
+            }
+
+            if (attrVal.getClass() == String.class) {
+                String s         = (String) attrVal;
+                String vertexVal = vertex.getProperty(propertyName, String.class);
+
+                if (!s.equalsIgnoreCase(vertexVal)) {
+                    LOG.warn("incorrect match from index-query for property {}: expected='{}';
found='{}'", propertyName, s, vertexVal);
+
+                    continue;
+                }
+            }
+
+            return vertex;
+        }
+
+        return null;
+    }
+
+    private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName,
String value) {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
+                .append(" AND ")
+                .append("v.\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value))
+                .append(" AND ")
+                .append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
+
+        return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 8120aaa..f372891 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -474,7 +474,6 @@ public class GraphBackedMetadataRepositoryTest {
         return guid;
     }
 
-    @GraphTransaction
     AtlasVertex getTableEntityVertex() {
         AtlasGraph graph = TestUtils.getGraph();
         AtlasGraphQuery query = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, ComparisionOperator.EQUAL,
TestUtils.TABLE_TYPE);
@@ -651,6 +650,7 @@ public class GraphBackedMetadataRepositoryTest {
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
+    @GraphTransaction
     public void testGetIdFromVertex() throws Exception {
         AtlasVertex tableVertex = getTableEntityVertex();
 
@@ -664,6 +664,7 @@ public class GraphBackedMetadataRepositoryTest {
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
+    @GraphTransaction
     public void testGetTypeName() throws Exception {
         AtlasVertex tableVertex = getTableEntityVertex();
         Assert.assertEquals(GraphHelper.getTypeName(tableVertex), TestUtils.TABLE_TYPE);

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
index 9331e35..62ef21c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
@@ -41,6 +41,7 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetadataService;
 import org.apache.atlas.store.AtlasTypeDefStore;
@@ -129,7 +130,11 @@ public abstract class AtlasDeleteHandlerV1Test {
             ImmutableList.<AtlasClassificationDef>of(),
             ImmutableList.of(mapValueDef, mapOwnerDef));
 
-        typeDefStore.createTypesDef(typesDef);
+        AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef,
typeRegistry);
+
+        if (!typesToCreate.isEmpty()) {
+            typeDefStore.createTypesDef(typesToCreate);
+        }
 
         compositeMapOwnerType = typeRegistry.getEntityTypeByName("CompositeMapOwner");
         compositeMapValueType = typeRegistry.getEntityTypeByName("CompositeMapValue");

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
index 3ebda0d..263ad5b 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
@@ -30,6 +30,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.store.AtlasTypeDefStore;
@@ -97,8 +98,13 @@ public abstract class AtlasRelationshipStoreV1Test {
         }
 
         init();
-        AtlasTypesDef testTypes = getInverseReferenceTestTypes();
-        typeDefStore.createTypesDef(testTypes);
+        AtlasTypesDef typesDef = getInverseReferenceTestTypes();
+
+        AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef,
typeRegistry);
+
+        if (!typesToCreate.isEmpty()) {
+            typeDefStore.createTypesDef(typesToCreate);
+        }
     }
 
     @BeforeTest

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 51276d3..b8255b3 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -42,6 +42,7 @@ import org.apache.atlas.service.Service;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.DateTimeHelper;
@@ -71,6 +72,7 @@ import static org.apache.atlas.AtlasClientV2.*;
 @Order(4)
 public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
     private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
     private static final String LOCALHOST = "localhost";
     private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
 
@@ -236,113 +238,124 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         @VisibleForTesting
         void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws
AtlasServiceException, AtlasException {
+            AtlasPerfTracer perf = null;
+
             HookNotificationMessage message = kafkaMsg.getMessage();
             String messageUser = message.getUser();
-            // Used for intermediate conversions during create and update
-            AtlasEntity.AtlasEntitiesWithExtInfo entities;
-            for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("handleMessage({}): attempt {}", message.getType().name(),
numRetries);
-                }
-                try {
-                    RequestContext requestContext = RequestContext.createContext();
-                    requestContext.setUser(messageUser);
 
-                    switch (message.getType()) {
-                        case ENTITY_CREATE:
-                            EntityCreateRequest createRequest = (EntityCreateRequest) message;
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
+            }
 
-                            if (numRetries == 0) { // audit only on the first attempt
-                                audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
-                            }
+            try {
+                // Used for intermediate conversions during create and update
+                AtlasEntity.AtlasEntitiesWithExtInfo entities;
+                for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("handleMessage({}): attempt {}", message.getType().name(),
numRetries);
+                    }
+                    try {
+                        RequestContext requestContext = RequestContext.createContext();
+                        requestContext.setUser(messageUser);
 
-                            entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
+                        switch (message.getType()) {
+                            case ENTITY_CREATE:
+                                EntityCreateRequest createRequest = (EntityCreateRequest)
message;
 
-                            atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities),
false);
-                            break;
+                                if (numRetries == 0) { // audit only on the first attempt
+                                    audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
+                                }
 
-                        case ENTITY_PARTIAL_UPDATE:
-                            final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest)
message;
+                                entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
 
-                            if (numRetries == 0) { // audit only on the first attempt
-                                audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                        String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(),
partialUpdateRequest.getTypeName()));
-                            }
+                                atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities),
false);
+                                break;
 
-                            Referenceable referenceable = partialUpdateRequest.getEntity();
-                            entities = instanceConverter.toAtlasEntity(referenceable);
+                            case ENTITY_PARTIAL_UPDATE:
+                                final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest)
message;
 
-                            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
-                            String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType,
new HashMap<String, Object>() {
-                                {
-                                    put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+                                if (numRetries == 0) { // audit only on the first attempt
+                                    audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
+                                            String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(),
partialUpdateRequest.getTypeName()));
                                 }
-                            });
 
-                            // There should only be one root entity
-                            entities.getEntities().get(0).setGuid(guid);
+                                Referenceable referenceable = partialUpdateRequest.getEntity();
+                                entities = instanceConverter.toAtlasEntity(referenceable);
 
-                            atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities),
true);
-                            break;
+                                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
+                                String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType,
new HashMap<String, Object>() {
+                                    {
+                                        put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+                                    }
+                                });
 
-                        case ENTITY_DELETE:
-                            final EntityDeleteRequest deleteRequest = (EntityDeleteRequest)
message;
+                                // There should only be one root entity
+                                entities.getEntities().get(0).setGuid(guid);
 
-                            if (numRetries == 0) { // audit only on the first attempt
-                                audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                        String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(),
deleteRequest.getTypeName()));
-                            }
+                                atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities),
true);
+                                break;
 
-                            try {
-                                AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
-                                atlasEntityStore.deleteByUniqueAttributes(type,
-                                        new HashMap<String, Object>() {{
-                                            put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
-                                        }});
-                            } catch (ClassCastException cle) {
-                                LOG.error("Failed to do a partial update on Entity");
-                            }
-                            break;
+                            case ENTITY_DELETE:
+                                final EntityDeleteRequest deleteRequest = (EntityDeleteRequest)
message;
 
-                        case ENTITY_FULL_UPDATE:
-                            EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
+                                if (numRetries == 0) { // audit only on the first attempt
+                                    audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
+                                            String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(),
deleteRequest.getTypeName()));
+                                }
 
-                            if (numRetries == 0) { // audit only on the first attempt
-                                audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
-                            }
+                                try {
+                                    AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
+                                    atlasEntityStore.deleteByUniqueAttributes(type,
+                                            new HashMap<String, Object>() {{
+                                                put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
+                                            }});
+                                } catch (ClassCastException cle) {
+                                    LOG.error("Failed to do a partial update on Entity");
+                                }
+                                break;
 
-                            entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
-                            atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities),
false);
-                            break;
+                            case ENTITY_FULL_UPDATE:
+                                EntityUpdateRequest updateRequest = (EntityUpdateRequest)
message;
 
-                        default:
-                            throw new IllegalStateException("Unknown notification type: "
+ message.getType().name());
-                    }
+                                if (numRetries == 0) { // audit only on the first attempt
+                                    audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
+                                }
 
-                    break;
-                } catch (Throwable e) {
-                    LOG.warn("Error handling message", e);
-                    try {
-                        LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
-                        Thread.sleep(consumerRetryInterval);
-                    } catch (InterruptedException ie) {
-                        LOG.error("Notification consumer thread sleep interrupted");
-                    }
+                                entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
+                                atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities),
false);
+                                break;
+
+                            default:
+                                throw new IllegalStateException("Unknown notification type:
" + message.getType().name());
+                        }
 
-                    if (numRetries == (maxRetries - 1)) {
-                        LOG.warn("Max retries exceeded for message {}", message, e);
-                        failedMessages.add(message);
-                        if (failedMessages.size() >= failedMsgCacheSize) {
-                            recordFailedMessages();
+                        break;
+                    } catch (Throwable e) {
+                        LOG.warn("Error handling message", e);
+                        try {
+                            LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+                            Thread.sleep(consumerRetryInterval);
+                        } catch (InterruptedException ie) {
+                            LOG.error("Notification consumer thread sleep interrupted");
+                        }
+
+                        if (numRetries == (maxRetries - 1)) {
+                            LOG.warn("Max retries exceeded for message {}", message, e);
+                            failedMessages.add(message);
+                            if (failedMessages.size() >= failedMsgCacheSize) {
+                                recordFailedMessages();
+                            }
+                            return;
                         }
-                        return;
+                    } finally {
+                        RequestContext.clear();
+                        RequestContextV1.clear();
                     }
-                } finally {
-                    RequestContext.clear();
-                    RequestContextV1.clear();
                 }
+                commit(kafkaMsg);
+            } finally {
+                AtlasPerfTracer.log(perf);
             }
-            commit(kafkaMsg);
         }
 
         private void recordFailedMessages() {


Mime
View raw message