ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject ranger git commit: RANGER-1897: tagsync update to replace Atlas V1 API usage with Atlas V2 API for tag-download using REST
Date Sun, 26 Nov 2017 03:36:10 GMT
Repository: ranger
Updated Branches:
  refs/heads/ranger-0.7 8ebad64ef -> 30b1188fe


RANGER-1897: tagsync update to replace Atlas V1 API usage with Atlas V2 API for tag-download using REST


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/30b1188f
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/30b1188f
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/30b1188f

Branch: refs/heads/ranger-0.7
Commit: 30b1188fe54788bcca3216dbeeb2f956e5cb9c9d
Parents: 8ebad64
Author: Madhan Neethiraj <madhan@apache.org>
Authored: Tue Nov 21 11:03:53 2017 -0800
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Sat Nov 25 11:59:58 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |   9 +-
 src/main/assembly/tagsync.xml                   |  12 +-
 tagsync/pom.xml                                 |  42 ++-
 .../source/atlas/AtlasHbaseResourceMapper.java  |  19 +-
 .../source/atlas/AtlasHdfsResourceMapper.java   |  17 +-
 .../source/atlas/AtlasHiveResourceMapper.java   |  19 +-
 .../source/atlas/AtlasKafkaResourceMapper.java  |  15 +-
 .../source/atlas/AtlasNotificationMapper.java   | 317 ++++++++++++++----
 .../source/atlas/AtlasResourceMapper.java       |   7 +
 .../source/atlas/AtlasResourceMapperUtil.java   |  25 ++
 .../tagsync/source/atlas/AtlasTagSource.java    |  48 +--
 .../source/atlasrest/AtlasRESTTagSource.java    | 129 +++++---
 .../tagsync/source/atlasrest/AtlasRESTUtil.java | 325 -------------------
 13 files changed, 512 insertions(+), 472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 80de97e..cc09475 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@
         <asm.all.version>3.2</asm.all.version>
         <asm.version>3.1</asm.version>
         <aspectj.version>1.8.2</aspectj.version>
-        <atlas.version>0.7-incubating</atlas.version>
+        <atlas.version>0.8.2-SNAPSHOT</atlas.version>
         <atlas.guava.version>14.0</atlas.guava.version>
         <atlas.gson.version>2.5</atlas.gson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
@@ -357,7 +357,12 @@
                 <groupId>com.webcohesion.enunciate</groupId>
                 <artifactId>enunciate-core-annotations</artifactId>
                 <version>2.8.0</version>
-                </dependency>
+            </dependency>
+            <dependency>
+                <groupId>com.sun.jersey.contribs</groupId>
+                <artifactId>jersey-multipart</artifactId>
+                <version>${jersey-bundle.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 0b17151..26b42ca 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -40,12 +40,16 @@
 					<include>com.google.inject:guice:jar:${guice.version}</include>
 					<include>com.google.inject.extensions:guice-multibindings:jar:${guice.version}</include>
 					<include>com.sun.jersey:jersey-bundle:jar:${jersey-bundle.version}</include>
+					<include>com.sun.jersey.contribs:jersey-multipart:jar:${sun-jersey-bundle.version}</include>
 					<include>com.thoughtworks.paranamer:paranamer:jar:${paranamer.version}</include>
 					<include>com.yammer.metrics:metrics-core</include>
 					<include>org.apache.atlas:atlas-notification:jar:${atlas.version}</include>
 					<include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include>
 					<include>org.apache.atlas:atlas-client:jar:${atlas.version}</include>
+					<include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include>
+					<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
 					<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
+					<include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include>
 					<include>org.apache.hadoop:hadoop-auth</include>
 					<include>org.apache.hadoop:hadoop-common</include>
 					<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>
@@ -55,10 +59,10 @@
 					<include>org.apache.ranger:ranger-plugins-common</include>
 					<include>org.apache.ranger:ranger-util</include>
 					<include>org.apache.zookeeper:zookeeper:jar:${zookeeper.version}</include>
-					<include>org.codehaus.jackson:jackson-core-asl</include>
-					<include>org.codehaus.jackson:jackson-jaxrs</include>
-					<include>org.codehaus.jackson:jackson-mapper-asl</include>
-					<include>org.codehaus.jackson:jackson-xc</include>
+					<include>org.codehaus.jackson:jackson-core-asl:jar:${codehaus.jackson.version}</include>
+					<include>org.codehaus.jackson:jackson-jaxrs:jar:${codehaus.jackson.version}</include>
+					<include>org.codehaus.jackson:jackson-mapper-asl:jar:${codehaus.jackson.version}</include>
+					<include>org.codehaus.jackson:jackson-xc:jar:${codehaus.jackson.version}</include>
 					<include>org.codehaus.jettison:jettison:jar:${jettison.version}</include>
 					<include>org.json4s:json4s-native_${scala.binary.version}:jar:${json4s.version}</include>
 					<include>org.json4s:json4s-core_${scala.binary.version}:jar:${json4s.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/pom.xml
----------------------------------------------------------------------
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index 42e9d2f..417a12f 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -55,6 +55,11 @@
             <version>${jersey-bundle.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.sun.jersey.contribs</groupId>
+            <artifactId>jersey-multipart</artifactId>
+            <version>${sun-jersey-bundle.version}</version>
+        </dependency>
+        <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
             <version>${commons.cli.version}</version>
@@ -110,6 +115,26 @@
             <version>${jettison.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-jaxrs</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-xc</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-notification</artifactId>
             <version>${atlas.version}</version>
@@ -121,7 +146,17 @@
         </dependency>
         <dependency>
             <groupId>org.apache.atlas</groupId>
-            <artifactId>atlas-client</artifactId>
+            <artifactId>atlas-client-v1</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-common</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v2</artifactId>
             <version>${atlas.version}</version>
         </dependency>
         <dependency>
@@ -144,6 +179,11 @@
 	    </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-intg</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_${scala.binary.version}</artifactId>
             <version>${kafka.version}</version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
index 8b36a31..00615e4 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.Map;
 import java.util.HashMap;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
@@ -48,7 +49,23 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
 
 	@Override
 	public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+		String entityGuid    = entity.getId() != null ? entity.getId()._getId() : null;
+		String entityType    = entity.getTypeName();
 		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+		return getServiceResource(entityGuid, entityType, qualifiedName);
+	}
+
+	@Override
+	public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception {
+		String entityGuid    = entity.getGuid();
+		String entityType    = entity.getTypeName();
+		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+		return getServiceResource(entityGuid, entityType, qualifiedName);
+	}
+
+	private RangerServiceResource getServiceResource(String entityGuid, String entityType, String qualifiedName) throws Exception {
 		if (StringUtils.isEmpty(qualifiedName)) {
 			throw new Exception("attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
 		}
@@ -63,8 +80,6 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
 			throwExceptionWithMessage("cluster-name not found in attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
 		}
 
-		String entityType  = entity.getTypeName();
-		String entityGuid  = entity.getId() != null ? entity.getId()._getId() : null;
 		String serviceName = getRangerServiceName(clusterName);
 
 		Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicyResource>();

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
index 06bff90..d970859 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
@@ -57,10 +58,25 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper {
 
 	@Override
 	public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+		String entityGuid    = entity.getId() != null ? entity.getId()._getId() : null;
 		String path          = getEntityAttribute(entity, ENTITY_ATTRIBUTE_PATH, String.class);
 		String clusterName   = getEntityAttribute(entity, ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class);
 		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
 
+		return getServiceResource(entityGuid, path, clusterName, qualifiedName);
+	}
+
+	@Override
+	public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception {
+		String entityGuid    = entity.getGuid();
+		String path          = getEntityAttribute(entity, ENTITY_ATTRIBUTE_PATH, String.class);
+		String clusterName   = getEntityAttribute(entity, ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class);
+		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+		return getServiceResource(entityGuid, path, clusterName, qualifiedName);
+	}
+
+	private RangerServiceResource getServiceResource(String entityGuid, String path, String clusterName, String qualifiedName) throws Exception {
 		if(StringUtils.isEmpty(path)) {
 			path = getResourceNameFromQualifiedName(qualifiedName);
 
@@ -81,7 +97,6 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper {
 			}
 		}
 
-		String  entityGuid  = entity.getId() != null ? entity.getId()._getId() : null;
 		String  serviceName = getRangerServiceName(clusterName);
 		Boolean isExcludes  = Boolean.FALSE;
 		Boolean isRecursive = Boolean.TRUE;

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
index a359622..84d1226 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.Map;
 import java.util.HashMap;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
@@ -47,7 +48,23 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper {
 
 	@Override
 	public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+		String entityGuid    = entity.getId() != null ? entity.getId()._getId() : null;
+		String entityType    = entity.getTypeName();
 		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+		return getServiceResource(entityGuid, entityType, qualifiedName);
+	}
+
+	@Override
+	public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception {
+		String entityGuid    = entity.getGuid();
+		String entityType    = entity.getTypeName();
+		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+		return getServiceResource(entityGuid, entityType, qualifiedName);
+	}
+
+	private RangerServiceResource getServiceResource(String entityGuid, String entityType, String qualifiedName) throws Exception {
 		if (StringUtils.isEmpty(qualifiedName)) {
 			throw new Exception("attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
 		}
@@ -62,8 +79,6 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper {
 			throwExceptionWithMessage("cluster-name not found in attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
 		}
 
-		String   entityType  = entity.getTypeName();
-		String   entityGuid  = entity.getId() != null ? entity.getId()._getId() : null;
 		String   serviceName = getRangerServiceName(clusterName);
 		String[] resources   = resourceStr.split(QUALIFIED_NAME_DELIMITER);
 		String   dbName      = resources.length > 0 ? resources[0] : null;

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
index 9f1fc2d..0c0247f 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy;
@@ -42,8 +43,21 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
 
 	@Override
 	public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+		String entityGuid    = entity.getId() != null ? entity.getId()._getId() : null;
 		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
 
+		return getServiceResource(entityGuid, qualifiedName);
+	}
+
+	@Override
+	public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception {
+		String entityGuid    = entity.getGuid();
+		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+		return getServiceResource(entityGuid, qualifiedName);
+	}
+
+	private RangerServiceResource getServiceResource(String entityGuid, String qualifiedName) throws Exception {
 		String topic = getResourceNameFromQualifiedName(qualifiedName);
 
 		if(StringUtils.isEmpty(topic)) {
@@ -67,7 +81,6 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
 
 		elements.put(RANGER_TYPE_KAFKA_TOPIC, new RangerPolicyResource(topic, isExcludes, isRecursive));
 
-		String  entityGuid  = entity.getId() != null ? entity.getId()._getId() : null;
 		String  serviceName = getRangerServiceName(clusterName);
 
 		return new RangerServiceResource(entityGuid, serviceName, elements);

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 922317e..f42c908 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -20,7 +20,16 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.persistence.Id;
@@ -35,10 +44,9 @@ import org.apache.ranger.plugin.model.RangerTagDef;
 import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef;
 import org.apache.ranger.plugin.util.ServiceTags;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
 
 public class AtlasNotificationMapper {
 	private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class);
@@ -46,6 +54,17 @@ public class AtlasNotificationMapper {
 
 	private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>();
 
+	private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
+		@Override
+		protected DateFormat initialValue() {
+			SimpleDateFormat dateFormat = new SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR);
+
+			dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+			return dateFormat;
+		}
+	};
+
 	private static void logUnhandledEntityNotification(EntityNotification entityNotification) {
 
 		final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS = 5 * 60 * 1000; // 5 minutes
@@ -134,6 +153,7 @@ public class AtlasNotificationMapper {
 				case ENTITY_UPDATE:
 				case ENTITY_DELETE:
 				case TRAIT_ADD:
+				case TRAIT_UPDATE:
 				case TRAIT_DELETE: {
 					ret = true;
 					break;
@@ -175,7 +195,6 @@ public class AtlasNotificationMapper {
 	}
 
 	static private Map<String, ServiceTags> buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws Exception {
-
 		Map<String, ServiceTags> ret = new HashMap<String, ServiceTags>();
 
 		for (AtlasEntityWithTraits element : entitiesWithTraits) {
@@ -189,11 +208,163 @@ public class AtlasNotificationMapper {
 			}
 		}
 
+		return ret;
+	}
+
+	static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception {
+		ServiceTags            ret             = null;
+		IReferenceableInstance entity          = entityWithTraits.getEntity();
+		RangerServiceResource  serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity);
+
+		if (serviceResource != null) {
+			List<RangerTag>    tags        = getTags(entityWithTraits);
+			List<RangerTagDef> tagDefs     = getTagDefs(entityWithTraits);
+			String             serviceName = serviceResource.getServiceName();
+
+			ret = createOrGetServiceTags(serviceTagsMap, serviceName);
+
+			if (serviceTagsMap == null || CollectionUtils.isNotEmpty(tags)) {
+				serviceResource.setId((long) ret.getServiceResources().size());
+				ret.getServiceResources().add(serviceResource);
+
+				List<Long> tagIds = new ArrayList<>();
+
+				if (CollectionUtils.isNotEmpty(tags)) {
+					for (RangerTag tag : tags) {
+						tag.setId((long) ret.getTags().size());
+						ret.getTags().put(tag.getId(), tag);
+
+						tagIds.add(tag.getId());
+					}
+				}
+				ret.getResourceToTagIds().put(serviceResource.getId(), tagIds);
+
+				if (CollectionUtils.isNotEmpty(tagDefs)) {
+					for (RangerTagDef tagDef : tagDefs) {
+						tagDef.setId((long) ret.getTagDefinitions().size());
+						ret.getTagDefinitions().put(tagDef.getId(), tagDef);
+					}
+				}
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Entity " + entityWithTraits + " does not have any tags associated with it when full-sync is being done.");
+					LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists,  will be removed from ranger");
+				}
+			}
+		} else {
+			LOG.error("Failed to build serviceResource for entity:" + entity.getId()._getId());
+		}
+
+		return ret;
+	}
+
+	static private ServiceTags createOrGetServiceTags(Map<String, ServiceTags> serviceTagsMap, String serviceName) {
+		ServiceTags ret = serviceTagsMap == null ? null : serviceTagsMap.get(serviceName);
+
+		if (ret == null) {
+			ret = new ServiceTags();
+
+			if (serviceTagsMap != null) {
+				serviceTagsMap.put(serviceName, ret);
+			}
+
+			ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+			ret.setServiceName(serviceName);
+		}
+
+		return ret;
+	}
+
+	static private List<RangerTag> getTags(AtlasEntityWithTraits entityWithTraits) {
+		List<RangerTag>        ret    = new ArrayList<RangerTag>();
+		IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null;
+
+		if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) {
+			for (String traitName : entity.getTraits()) {
+				IStruct             trait    = entity.getTrait(traitName);
+				Map<String, String> tagAttrs = new HashMap<String, String>();
+
+				try {
+					Map<String, Object> attrs = trait.getValuesMap();
+
+					if(MapUtils.isNotEmpty(attrs)) {
+						for (Map.Entry<String, Object> attrEntry : attrs.entrySet()) {
+							String attrName  = attrEntry.getKey();
+							Object attrValue = attrEntry.getValue();
+
+							tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null);
+						}
+					}
+
+				} catch (AtlasException exception) {
+					LOG.error("Could not get values for trait:" + trait.getTypeName(), exception);
+				}
+
+				ret.add(new RangerTag(null, trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE));
+			}
+		}
+
+		return ret;
+	}
+
+	static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits entityWithTraits) {
+		List<RangerTagDef>     ret    = new ArrayList<RangerTagDef>();
+		IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null;
+
+		if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) {
+			for (String traitName : entity.getTraits()) {
+				IStruct       trait = entity.getTrait(traitName);
+				RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
+
+				try {
+					Map<String, Object> attrs = trait.getValuesMap();
+
+					if(MapUtils.isNotEmpty(attrs)) {
+						for (String attrName : attrs.keySet()) {
+							tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+						}
+					}
+				} catch (AtlasException exception) {
+					LOG.error("Could not get values for trait:" + trait.getTypeName(), exception);
+				}
+
+				ret.add(tagDef);
+			}
+		}
+
+		return ret;
+	}
+
+	public static Map<String, ServiceTags> processSearchResult(AtlasSearchResult result, AtlasTypeRegistry typeRegistry) {
+		Map<String, ServiceTags> ret = null;
+
+		try {
+			ret = buildServiceTags(result, typeRegistry);
+		} catch (Exception exception) {
+			LOG.error("Failed to build serviceTags", exception);
+		}
+
+		return ret;
+	}
+
+	static private Map<String, ServiceTags> buildServiceTags(AtlasSearchResult result, AtlasTypeRegistry typeRegistry) throws Exception {
+		Map<String, ServiceTags> ret = new HashMap<>();
+
+		for (AtlasEntityHeader entity : result.getEntities()) {
+			if (entity != null && entity.getStatus() == AtlasEntity.Status.ACTIVE) {
+				buildServiceTags(entity, typeRegistry, ret);
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Ignoring entity because its State is not ACTIVE: " + entity);
+				}
+			}
+		}
+
 		// Remove duplicate tag definitions
 		if(CollectionUtils.isNotEmpty(ret.values())) {
 			for (ServiceTags serviceTag : ret.values()) {
 				if(MapUtils.isNotEmpty(serviceTag.getTagDefinitions())) {
-					Map<String, RangerTagDef> uniqueTagDefs = new HashMap<String, RangerTagDef>();
+					Map<String, RangerTagDef> uniqueTagDefs = new HashMap<>();
 
 					for (RangerTagDef tagDef : serviceTag.getTagDefinitions().values()) {
 						RangerTagDef existingTagDef = uniqueTagDefs.get(tagDef.getName());
@@ -239,25 +410,22 @@ public class AtlasNotificationMapper {
 		return ret;
 	}
 
-	static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception {
-		ServiceTags            ret             = null;
-		IReferenceableInstance entity          = entityWithTraits.getEntity();
-		RangerServiceResource  serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity);
+	static private ServiceTags buildServiceTags(AtlasEntityHeader entity, AtlasTypeRegistry typeRegistry, Map<String, ServiceTags> serviceTagsMap) throws Exception {
+		ServiceTags           ret             = null;
+		RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity);
 
 		if (serviceResource != null) {
-
-			List<RangerTag>     tags        = getTags(entityWithTraits);
-			List<RangerTagDef>  tagDefs     = getTagDefs(entityWithTraits);
+			List<RangerTag>     tags        = getTags(entity, typeRegistry);
+			List<RangerTagDef>  tagDefs     = getTagDefs(entity);
 			String              serviceName = serviceResource.getServiceName();
 
 			ret = createOrGetServiceTags(serviceTagsMap, serviceName);
 
 			if (serviceTagsMap == null || CollectionUtils.isNotEmpty(tags)) {
-
 				serviceResource.setId((long) ret.getServiceResources().size());
 				ret.getServiceResources().add(serviceResource);
 
-				List<Long> tagIds = new ArrayList<Long>();
+				List<Long> tagIds = new ArrayList<>();
 
 				if (CollectionUtils.isNotEmpty(tags)) {
 					for (RangerTag tag : tags) {
@@ -277,90 +445,125 @@ public class AtlasNotificationMapper {
 				}
 			} else {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Entity " + entityWithTraits + " does not have any tags associated with it when full-sync is being done.");
+					LOG.debug("Entity " + entity + " does not have any tags associated with it when full-sync is being done.");
 					LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists,  will be removed from ranger");
 				}
 			}
 		} else {
-			LOG.error("Failed to build serviceResource for entity:" + entity.getId()._getId());
+			LOG.error("Failed to build serviceResource for entity:" + entity.getGuid());
 		}
 
 		return ret;
 	}
 
-	static private ServiceTags createOrGetServiceTags(Map<String, ServiceTags> serviceTagsMap, String serviceName) {
-		ServiceTags ret = serviceTagsMap == null ? null : serviceTagsMap.get(serviceName);
+	static private List<RangerTag> getTags(AtlasEntityHeader entity, AtlasTypeRegistry typeRegistry) {
+		List<RangerTag> ret = new ArrayList<>();
 
-		if (ret == null) {
-			ret = new ServiceTags();
+		if(entity != null && CollectionUtils.isNotEmpty(entity.getClassificationNames())) {
+			List<AtlasClassification> classifications = entity.getClassifications();
 
-			if (serviceTagsMap != null) {
-				serviceTagsMap.put(serviceName, ret);
+			for (AtlasClassification classification : classifications) {
+				ret.add(getRangerTag(classification, typeRegistry));
+
+				List<AtlasClassification> superClassifications = getSuperClassifications(classification, typeRegistry);
+
+				if (CollectionUtils.isNotEmpty(superClassifications)) {
+					for (AtlasClassification superClassification : superClassifications) {
+						ret.add(getRangerTag(superClassification, typeRegistry));
+					}
+				}
 			}
+		}
 
-			ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
-			ret.setServiceName(serviceName);
+		return ret;
+	}
+
+	static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity) {
+		List<RangerTagDef> ret = new ArrayList<>();
+
+		if(entity != null && CollectionUtils.isNotEmpty(entity.getClassificationNames())) {
+			List<AtlasClassification> traits = entity.getClassifications();
+
+			for (AtlasClassification trait : traits) {
+				RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
+
+				if(MapUtils.isNotEmpty(trait.getAttributes())) {
+					for (String attrName : trait.getAttributes().keySet()) {
+						tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+					}
+				}
+
+				ret.add(tagDef);
+			}
 		}
 
 		return ret;
 	}
 
-	static private List<RangerTag> getTags(AtlasEntityWithTraits entityWithTraits) {
-		List<RangerTag> ret = new ArrayList<RangerTag>();
+	static private List<AtlasClassification> getSuperClassifications(AtlasClassification classification, AtlasTypeRegistry typeRegistry) {
+		List<AtlasClassification> ret                = null;
+		AtlasClassificationType   classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
 
-		if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
-			List<IStruct> traits = entityWithTraits.getAllTraits();
+		if (classificationType != null && CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) {
+			ret = new ArrayList<>(classificationType.getAllSuperTypes().size());
 
-			for (IStruct trait : traits) {
-				Map<String, String> tagAttrs = new HashMap<String, String>();
+			for (String superTypeName : classificationType.getAllSuperTypes()) {
+				AtlasClassification superClassification = new AtlasClassification(superTypeName);
 
-				try {
-					Map<String, Object> attrs = trait.getValuesMap();
+				if (MapUtils.isNotEmpty(classification.getAttributes())) {
+					AtlasClassificationType superClassificationType = typeRegistry.getClassificationTypeByName(superTypeName);
 
-					if(MapUtils.isNotEmpty(attrs)) {
-						for (Map.Entry<String, Object> attrEntry : attrs.entrySet()) {
-							String attrName  = attrEntry.getKey();
-							Object attrValue = attrEntry.getValue();
+					if (superClassificationType != null && MapUtils.isNotEmpty(superClassificationType.getAllAttributes())) {
+						Map<String, Object> superClassificationAttributes = new HashMap<>();
 
-							tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null);
+						for (Map.Entry<String, Object> entry : classification.getAttributes().entrySet()) {
+							String attrName = entry.getKey();
+
+							if (superClassificationType.getAllAttributes().containsKey(attrName)) {
+								superClassificationAttributes.put(attrName, entry.getValue());
+							}
 						}
+
+						superClassification.setAttributes(superClassificationAttributes);
 					}
-				} catch (AtlasException exception) {
-					LOG.error("Could not get values for trait:" + trait.getTypeName(), exception);
 				}
 
-				ret.add(new RangerTag(null, trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE));
+				ret.add(superClassification);
 			}
 		}
 
 		return ret;
 	}
 
-	static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits entityWithTraits) {
-		List<RangerTagDef> ret = new ArrayList<RangerTagDef>();
+	static private RangerTag getRangerTag(AtlasClassification classification, AtlasTypeRegistry typeRegistry) {
+		final Map<String, String> tagAttrs;
 
-		if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
-			List<IStruct> traits = entityWithTraits.getAllTraits();
+		if(MapUtils.isNotEmpty(classification.getAttributes())) {
+			tagAttrs = new HashMap<>();
 
-			for (IStruct trait : traits) {
-				RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
+			for (Map.Entry<String, Object> attrEntry : classification.getAttributes().entrySet()) {
+				String attrName  = attrEntry.getKey();
+				Object attrValue = attrEntry.getValue();
 
-				try {
-					Map<String, Object> attrs = trait.getValuesMap();
+				// V2 Atlas APIs have date attributes as number; convert the value to earlier version format, so that
+				// Ranger conditions can recognize the value correctly
+				if (attrValue instanceof Number) {
+					AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+					AtlasAttribute          attribute          = (classificationType != null) ? classificationType.getAttribute(attrName) : null;
 
-					if(MapUtils.isNotEmpty(attrs)) {
-						for (String attrName : attrs.keySet()) {
-							tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
-						}
+					if (attribute != null && attribute.getAttributeType() instanceof AtlasBuiltInTypes.AtlasDateType) {
+						Date dateValue = new Date(((Number)attrValue).longValue());
+
+						attrValue = DATE_FORMATTER.get().format(dateValue);
 					}
-				} catch (AtlasException exception) {
-					LOG.error("Could not get values for trait:" + trait.getTypeName(), exception);
 				}
 
-				ret.add(tagDef);
+				tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null);
 			}
+		} else {
+			tagAttrs = Collections.emptyMap();
 		}
 
-		return ret;
+		return new RangerTag(null, classification.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE);
 	}
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
index 8ececdf..a2ad796 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import java.util.Map;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -75,6 +76,8 @@ public abstract class AtlasResourceMapper {
 
 	abstract public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception;
 
+	abstract public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception;
+
 	protected String getCustomRangerServiceName(String atlasInstanceName) {
 		if(properties != null) {
 			String propName = TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName
@@ -132,6 +135,10 @@ public abstract class AtlasResourceMapper {
 		return ret;
 	}
 
+	static protected <T> T getEntityAttribute(AtlasEntityHeader entity, String name, Class<T> type) {
+		return getAttribute(entity.getAttributes(), name, type);
+	}
+
 	static protected <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
 		return type.cast(map.get(name));
 	}

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
index f9f0eaf..d004bff 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.tagsync.source.atlas;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerServiceResource;
@@ -74,6 +75,30 @@ public class AtlasResourceMapperUtil {
 		return resource;
 	}
 
+	public static RangerServiceResource getRangerServiceResource(AtlasEntityHeader atlasEntity) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> getRangerServiceResource(" + atlasEntity.getGuid() +")");
+		}
+
+		RangerServiceResource resource = null;
+
+		AtlasResourceMapper mapper = atlasResourceMappers.get(atlasEntity.getTypeName());
+
+		if (mapper != null) {
+			try {
+				resource = mapper.buildResource(atlasEntity);
+			} catch (Exception exception) {
+				LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getGuid() + ": ", exception);
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== getRangerServiceResource(" + atlasEntity.getGuid() +"): resource=" + resource);
+		}
+
+		return resource;
+	}
+
 	static public boolean initializeAtlasResourceMappers(Properties properties) {
 		final String MAPPER_NAME_DELIMITER = ",";
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 12b02d9..95ff8ec 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -20,26 +20,22 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Provider;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
 import org.apache.atlas.notification.entity.EntityNotification;
-
-import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.kafka.common.TopicPartition;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Properties;
 import java.util.List;
+import java.util.Properties;
 
 public class AtlasTagSource extends AbstractTagSource {
 	private static final Log LOG = LogFactory.getLog(AtlasTagSource.class);
@@ -102,12 +98,7 @@ public class AtlasTagSource extends AbstractTagSource {
 		}
 
 		if (ret) {
-			NotificationModule notificationModule = new NotificationModule();
-
-			Injector injector = Guice.createInjector(notificationModule);
-
-			Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class);
-			NotificationInterface notification = consumerProvider.get();
+			NotificationInterface notification = NotificationProvider.get();
 			List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
 
 			consumerTask = new ConsumerRunnable(iterators.get(0));
@@ -163,15 +154,6 @@ public class AtlasTagSource extends AbstractTagSource {
 			this.consumer = consumer;
 		}
 
-		private boolean hasNext() {
-			boolean ret = false;
-			try {
-				ret = consumer.hasNext();
-			} catch (Exception exception) {
-				LOG.error("EntityNotification consumer threw exception, IGNORING...:", exception);
-			}
-			return ret;
-		}
 
 		@Override
 		public void run() {
@@ -180,8 +162,11 @@ public class AtlasTagSource extends AbstractTagSource {
 			}
 			while (true) {
 				try {
-					if (hasNext()) {
-						EntityNotification notification = consumer.peek();
+					List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
+
+					for (AtlasKafkaMessage<EntityNotification> message :  messages) {
+						EntityNotification notification = message != null ? message.getMessage() : null;
+
 						if (notification != null) {
 							if (LOG.isDebugEnabled()) {
 								LOG.debug("Notification=" + getPrintableEntityNotification(notification));
@@ -191,11 +176,12 @@ public class AtlasTagSource extends AbstractTagSource {
 							if (serviceTags != null) {
 								updateSink(serviceTags);
 							}
+
+							TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+							consumer.commit(partition, message.getOffset());
 						} else {
 							LOG.error("Null entityNotification received from Kafka!! Ignoring..");
 						}
-						// Move iterator forward
-						consumer.next();
 					}
 				} catch (Exception exception) {
 					LOG.error("Caught exception..: ", exception);

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 4e0ae90..239f143 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -23,48 +23,52 @@ import com.google.gson.Gson;
 
 import com.google.gson.GsonBuilder;
 
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ranger.tagsync.model.AbstractTagSource;
 import org.apache.ranger.plugin.util.ServiceTags;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
 import org.apache.ranger.tagsync.process.TagSynchronizer;
-import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
 import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
 import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
 
-import java.util.List;
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
 public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 	private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class);
 
-	private long sleepTimeBetweenCycleInMillis;
-
-	private AtlasRESTUtil atlasRESTUtil = null;
-
-	private Thread myThread = null;
+	private long     sleepTimeBetweenCycleInMillis;
+	private String[] restUrls         = null;
+	private boolean  isKerberized     = false;
+	private String[] userNamePassword = null;
+	private Thread   myThread         = null;
 
 	public static void main(String[] args) {
-
-		AtlasRESTTagSource atlasRESTTagSource = new AtlasRESTTagSource();
-
-		TagSyncConfig config = TagSyncConfig.getInstance();
-
-		Properties props = config.getProperties();
+		TagSyncConfig config  = TagSyncConfig.getInstance();
+		Properties    props   = config.getProperties();
+		TagSink       tagSink = TagSynchronizer.initializeTagSink(props);
 
 		TagSynchronizer.printConfigurationProperties(props);
 
-		TagSink tagSink = TagSynchronizer.initializeTagSink(props);
-
 		if (tagSink != null) {
+			AtlasRESTTagSource atlasRESTTagSource = new AtlasRESTTagSource();
 
 			if (atlasRESTTagSource.initialize(props)) {
 				try {
@@ -79,46 +83,45 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 				LOG.error("AtlasRESTTagSource initialized failed, exiting.");
 				System.exit(1);
 			}
-
 		} else {
 			LOG.error("TagSink initialialization failed, exiting.");
 			System.exit(1);
 		}
-
 	}
+
 	@Override
 	public boolean initialize(Properties properties) {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> AtlasRESTTagSource.initialize()");
 		}
 
+		sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
+
 		boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
 
-		sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
-		final boolean isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
+		String  sslConfigFile = TagSyncConfig.getAtlasRESTSslConfigFile(properties);
+
+		this.isKerberized     = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
+		this.userNamePassword = new String[] { TagSyncConfig.getAtlasRESTUserName(properties), TagSyncConfig.getAtlasRESTPassword(properties) };
 
-		String restUrl       = TagSyncConfig.getAtlasRESTEndpoint(properties);
-		String sslConfigFile = TagSyncConfig.getAtlasRESTSslConfigFile(properties);
-		String userName = TagSyncConfig.getAtlasRESTUserName(properties);
-		String password = TagSyncConfig.getAtlasRESTPassword(properties);
+		String restEndpoint = TagSyncConfig.getAtlasRESTEndpoint(properties);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("restUrl=" + restUrl);
+			LOG.debug("restEndpoint=" + restEndpoint);
 			LOG.debug("sslConfigFile=" + sslConfigFile);
-			LOG.debug("userName=" + userName);
+			LOG.debug("userName=" + userNamePassword[0]);
 			LOG.debug("kerberized=" + isKerberized);
 		}
 
-		if (StringUtils.isNotEmpty(restUrl)) {
-			if (!restUrl.endsWith("/")) {
-				restUrl += "/";
-			}
-			RangerRESTClient atlasRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
+		if (StringUtils.isNotEmpty(restEndpoint)) {
+			this.restUrls = restEndpoint.split(",");
 
-			if (!isKerberized) {
-				atlasRESTClient.setBasicAuthInfo(userName, password);
+			for (int i = 0; i < restUrls.length; i++) {
+				if (!restUrls[i].endsWith("/")) {
+					restUrls[i] += "/";
+				}
 			}
-			atlasRESTUtil = new AtlasRESTUtil(atlasRESTClient, isKerberized);
+
 		} else {
 			LOG.info("AtlasEndpoint not specified, Initial download of Atlas-entities cannot be done.");
 			ret = false;
@@ -133,7 +136,6 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 
 	@Override
 	public boolean start() {
-
 		myThread = new Thread(this);
 		myThread.setDaemon(true);
 		myThread.start();
@@ -150,21 +152,17 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 
 	@Override
 	public void run() {
-
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> AtlasRESTTagSource.run()");
 		}
 
 		while (true) {
-
 			synchUp();
 
 			LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
 
 			try {
-
 				Thread.sleep(sleepTimeBetweenCycleInMillis);
-
 			} catch (InterruptedException exception) {
 				LOG.error("Interrupted..: ", exception);
 				return;
@@ -173,17 +171,40 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 	}
 
 	public void synchUp() {
+		SearchParameters           searchParams = new SearchParameters();
+		AtlasTypeRegistry          typeRegistry = new AtlasTypeRegistry();
+		AtlasTransientTypeRegistry tty          = null;
+		AtlasSearchResult          searchResult = null;
+
+		searchParams.setClassification("*");
+		searchParams.setIncludeClassificationAttributes(true);
+		searchParams.setOffset(0);
+		searchParams.setLimit(Integer.MAX_VALUE);
+
+		try {
+			AtlasClientV2 atlasClient = getAtlasClient();
+
+			searchResult = atlasClient.facetedSearch(searchParams);
 
-		List<AtlasEntityWithTraits> atlasEntities = atlasRESTUtil.getAtlasEntities();
+			AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter());
 
-		if (CollectionUtils.isNotEmpty(atlasEntities)) {
+			tty = typeRegistry.lockTypeRegistryForUpdate();
+
+			tty.addTypes(typesDef);
+		} catch (AtlasServiceException | AtlasBaseException | IOException excp) {
+			LOG.error("failed to download tags from Atlas", excp);
+		} finally {
+			if (tty != null) {
+				typeRegistry.releaseTypeRegistryForUpdate(tty, true);
+			}
+		}
+
+		if (searchResult != null) {
 			if (LOG.isDebugEnabled()) {
-				for (AtlasEntityWithTraits element : atlasEntities) {
-					LOG.debug(element);
-				}
+				LOG.debug(AtlasType.toJson(searchResult));
 			}
 
-			Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntities);
+			Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processSearchResult(searchResult, typeRegistry);
 
 			if (MapUtils.isNotEmpty(serviceTagsMap)) {
 				for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) {
@@ -195,6 +216,7 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 
 						LOG.debug("serviceTags=" + serviceTagsString);
 					}
+
 					updateSink(entry.getValue());
 				}
 			}
@@ -202,5 +224,20 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 
 	}
 
+	private AtlasClientV2 getAtlasClient() throws IOException {
+		final AtlasClientV2 ret;
+
+		if (isKerberized) {
+			UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+
+			ugi.checkTGTAndReloginFromKeytab();
+
+			ret = new AtlasClientV2(ugi, ugi.getShortUserName(), restUrls);
+		} else {
+			ret = new AtlasClientV2(restUrls, userNamePassword);
+		}
+
+		return ret;
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
deleted file mode 100644
index 00a101e..0000000
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlasrest;
-
-import com.google.gson.Gson;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
-import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("unchecked")
-public class AtlasRESTUtil {
-	private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class);
-
-	private static final String REST_MIME_TYPE_JSON = "application/json";
-	private static final String API_ATLAS_TYPES    = "api/atlas/types";
-	private static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
-	private static final String API_ATLAS_ENTITY   = "api/atlas/entities/";
-	private static final String API_ATLAS_TYPE     = "api/atlas/types/";
-
-	private static final String RESULTS_ATTRIBUTE               = "results";
-	private static final String DEFINITION_ATTRIBUTE            = "definition";
-	private static final String VALUES_ATTRIBUTE                = "values";
-	private static final String TRAITS_ATTRIBUTE                = "traits";
-	private static final String TYPE_NAME_ATTRIBUTE             = "typeName";
-	private static final String TRAIT_TYPES_ATTRIBUTE           = "traitTypes";
-	private static final String SUPER_TYPES_ATTRIBUTE           = "superTypes";
-	private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
-	private static final String NAME_ATTRIBUTE                  = "name";
-
-	private final Gson gson = new Gson();
-
-	private final RangerRESTClient atlasRESTClient;
-
-	private final boolean isKerberized;
-
-	public AtlasRESTUtil(RangerRESTClient atlasRESTClient, boolean isKerberized) {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> AtlasRESTUtil()");
-		}
-
-		this.atlasRESTClient = atlasRESTClient;
-
-		this.isKerberized = isKerberized;
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== AtlasRESTUtil()");
-		}
-	}
-
-	public List<AtlasEntityWithTraits> getAtlasEntities() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> getAtlasEntities()");
-		}
-
-		List<AtlasEntityWithTraits> ret = new ArrayList<AtlasEntityWithTraits>();
-
-		Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
-
-		List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
-
-		if (CollectionUtils.isNotEmpty(types)) {
-
-			for (String type : types) {
-
-				if (!AtlasResourceMapperUtil.isEntityTypeHandled(type)) {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Not fetching Atlas entities of type: " + type);
-					}
-					continue;
-				}
-
-				Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
-
-				List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
-				if (CollectionUtils.isEmpty(guids)) {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("No Atlas entities for type: " + type);
-					}
-					continue;
-				}
-
-				for (String guid : guids) {
-
-					Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
-
-					Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
-
-					Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-
-					List<IStruct> allTraits = new LinkedList<>();
-
-					if (MapUtils.isNotEmpty(traitsAttribute)) {
-
-						for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) {
-
-							Map<String, Object> trait = (Map<String, Object>) entry.getValue();
-
-							Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-							String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-							if (StringUtils.isEmpty(traitTypeName)) {
-								continue;
-							}
-
-							List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
-							Struct trait1 = new Struct(traitTypeName, traitValues);
-
-							allTraits.add(trait1);
-							allTraits.addAll(superTypes);
-						}
-					}
-
-					IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
-
-					if (entity != null) {
-						AtlasEntityWithTraits atlasEntity = new AtlasEntityWithTraits(entity, allTraits);
-						ret.add(atlasEntity);
-					} else {
-						if (LOG.isInfoEnabled()) {
-							LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid);
-						}
-					}
-
-				}
-
-			}
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("<== getAtlasEntities()");
-			}
-		}
-
-		return ret;
-	}
-
-	private Map<String, Object> getTraitType(String traitName) {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> getTraitType(" + traitName + ")");
-		}
-		Map<String, Object> ret = null;
-
-		Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
-
-		Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
-
-		List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
-
-		if (CollectionUtils.isNotEmpty(traitTypes)) {
-			ret = (Map<String, Object>) traitTypes.get(0);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== getTraitType(" + traitName + ")");
-		}
-		return ret;
-	}
-
-	private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values) {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> getTraitSuperTypes()");
-		}
-		List<IStruct> ret = new LinkedList<>();
-
-		if (traitType != null) {
-
-			List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
-
-			if (CollectionUtils.isNotEmpty(superTypeNames)) {
-				for (String superTypeName : superTypeNames) {
-
-					Map<String, Object> superTraitType = getTraitType(superTypeName);
-
-					if (superTraitType != null) {
-						List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-						Map<String, Object> superTypeValues = new HashMap<>();
-						for (Map<String, Object> attributeDefinition : attributeDefinitions) {
-
-							String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
-							if (values.containsKey(attributeName)) {
-								superTypeValues.put(attributeName, values.get(attributeName));
-							}
-						}
-
-						List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values);
-
-						Struct superTrait = new Struct(superTypeName, superTypeValues);
-
-						ret.add(superTrait);
-						ret.addAll(superTraits);
-					}
-				}
-			}
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== getTraitSuperTypes()");
-		}
-		return ret;
-	}
-
-	private Map<String, Object> atlasAPI(final String endpoint) {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> atlasAPI(" + endpoint + ")");
-		}
-		Map<String, Object> ret = new HashMap<String, Object>();
-
-		try {
-			UserGroupInformation userGroupInformation = null;
-			if (isKerberized) {
-				userGroupInformation = UserGroupInformation.getLoginUser();
-
-				try {
-					userGroupInformation.checkTGTAndReloginFromKeytab();
-				} catch (IOException ioe) {
-					LOG.error("Error renewing TGT and relogin", ioe);
-					userGroupInformation = null;
-				}
-			}
-			if (userGroupInformation != null) {
-				LOG.debug("Using kerberos authentication");
-				if(LOG.isDebugEnabled()) {
-					LOG.debug("Using Principal = "+ userGroupInformation.getUserName());
-				}
-				ret = userGroupInformation.doAs(new PrivilegedAction<Map<String, Object>>() {
-					@Override
-					public Map<String, Object> run() {
-						try{
-							return executeAtlasAPI(endpoint);
-						}catch (Exception e) {
-							LOG.error("Atlas API failed with message : ", e);
-						}
-						return null;
-					}
-				});
-			} else {
-				LOG.debug("Using basic authentication");
-				ret = executeAtlasAPI(endpoint);
-			}
-		} catch (Exception exception) {
-			LOG.error("Exception when fetching Atlas objects.", exception);
-			ret = null;
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== atlasAPI(" + endpoint + ")");
-		}
-		return ret;
-	}
-
-	private Map<String, Object> executeAtlasAPI(final String endpoint) {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> executeAtlasAPI(" + endpoint + ")");
-		}
-
-		Map<String, Object> ret = new HashMap<String, Object>();
-
-		try {
-			final WebResource webResource = atlasRESTClient.getResource(endpoint);
-
-			ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-			if (response != null && response.getStatus() == 200) {
-				ret = response.getEntity(ret.getClass());
-			} else {
-				RESTResponse resp = RESTResponse.fromClientResponse(response);
-				LOG.error("Error getting atlas data request=" + webResource.toString()
-						+ ", response=" + resp.toString());
-			}
-		} catch (Exception exception) {
-			LOG.error("Exception when fetching Atlas objects.", exception);
-			ret = null;
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== executeAtlasAPI(" + endpoint + ")");
-		}
-
-		return ret;
-	}
-
-	private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
-		return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : null;
-	}
-
-}


Mime
View raw message