ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [1/2] incubator-ranger git commit: RANGER-807: TagSync should support periodic full sync with Apache Atlas
Date Sun, 10 Jan 2016 08:15:44 GMT
Repository: incubator-ranger
Updated Branches:
  refs/heads/master fec84603f -> 5b9c094ff


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 666c2c8..803a8a9 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -23,76 +23,57 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerServiceResource;
 import org.apache.ranger.plugin.model.RangerTag;
 import org.apache.ranger.plugin.model.RangerTagDef;
 import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
 
 import java.util.*;
 
 public class AtlasNotificationMapper {
 	private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class);
 
-	public static final String ENTITY_TYPE_HIVE_DB = "hive_db";
-	public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
-	public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
-
-	public static final String RANGER_TYPE_HIVE_DB = "database";
-	public static final String RANGER_TYPE_HIVE_TABLE = "table";
-	public static final String RANGER_TYPE_HIVE_COLUMN = "column";
-
-	public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
-	public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name";
-	public static final String QUALIFIED_NAME_FORMAT_DELIMITER_STRING = "\\.";
-	public static final String QUALIFIED_NAME_FORMAT_CLUSTER_DELIMITER_STRING = "@";
-
-	private static Properties properties = null;
-
-	public static ServiceTags processEntityNotification(EntityNotification entityNotification, Properties props) {
+	public static ServiceTags processEntityNotification(EntityNotification entityNotification) {
 
 		ServiceTags ret = null;
-		properties = props;
 
-		try {
-			IReferenceableInstance entity = entityNotification.getEntity();
-
-			if (isEntityMappable(entity)) {
-				ret = createServiceTags(entityNotification);
-			} else {
-				if(LOG.isDebugEnabled()) {
-					LOG.debug("Ranger not interested in Entity Notification for entity-type " + entityNotification.getEntity().getTypeName());
+		if (isNotificationHandled(entityNotification)) {
+			try {
+				IReferenceableInstance entity = entityNotification.getEntity();
+
+				if (AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) {
+					AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entityNotification.getEntity(), entityNotification.getAllTraits());
+					ret = buildServiceTags(entityWithTraits, 1L, 1L, null);
+				} else {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Ranger not interested in Entity Notification for entity-type " + entityNotification.getEntity().getTypeName());
+					}
 				}
+			} catch (Exception exception) {
+				LOG.error("createServiceTags() failed!! ", exception);
 			}
-		} catch (Exception exception) {
-			LOG.error("createServiceTags() failed!! ", exception);
 		}
 		return ret;
 	}
 
-	static private boolean isEntityMappable(IReferenceableInstance entity) {
-		boolean ret = false;
-
-		String entityTypeName = entity.getTypeName();
+	public static Map<String, ServiceTags> processEntitiesWithTraits(List<AtlasEntityWithTraits> atlasEntities) {
+		Map<String, ServiceTags> ret = null;
 
-		if (StringUtils.isNotBlank(entityTypeName)) {
-			if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB) ||
-					StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) ||
-					StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) {
-				ret = true;
-			}
+		try {
+			ret = buildServiceTags(atlasEntities);
+		} catch (Exception exception) {
+			LOG.error("Failed to build serviceTags", exception);
 		}
+
 		return ret;
 	}
 
-	static private ServiceTags createServiceTags(EntityNotification entityNotification) throws Exception {
-
-		ServiceTags ret = null;
+	static private boolean isNotificationHandled(EntityNotification entityNotification) {
+		boolean ret = false;
 
 		EntityNotification.OperationType opType = entityNotification.getOperationType();
 
@@ -101,17 +82,10 @@ public class AtlasNotificationMapper {
 				LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
 				break;
 			}
-			case ENTITY_UPDATE: {
-				ret = getServiceTags(entityNotification);
-				if (MapUtils.isEmpty(ret.getTags())) {
-					LOG.debug("No traits associated with this entity update notification. Ignoring it altogether");
-					ret = null;
-				}
-				break;
-			}
+			case ENTITY_UPDATE:
 			case TRAIT_ADD:
 			case TRAIT_DELETE: {
-				ret = getServiceTags(entityNotification);
+				ret = true;
 				break;
 			}
 			default:
@@ -121,130 +95,116 @@ public class AtlasNotificationMapper {
 		return ret;
 	}
 
-	static private ServiceTags getServiceTags(EntityNotification entityNotification) throws Exception {
-		ServiceTags ret = null;
+	static private Map<String, ServiceTags> buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws Exception {
 
-		IReferenceableInstance entity = entityNotification.getEntity();
+		Map<String, ServiceTags> ret = new HashMap<String, ServiceTags>();
 
-		List<RangerServiceResource> serviceResources = new ArrayList<RangerServiceResource>();
+		long serviceResourceIndex = 1L;
+		long tagIndex = 1L;
 
-		RangerServiceResource serviceResource = getServiceResource(entity);
-		serviceResources.add(serviceResource);
+		for (AtlasEntityWithTraits element : entitiesWithTraits) {
 
-		Map<Long, RangerTag> tags = getTags(entityNotification);
+			ServiceTags serviceTags = buildServiceTags(element, serviceResourceIndex, tagIndex, ret);
 
-		Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
+			serviceResourceIndex++;
 
-		Map<Long, List<Long>> resourceIdToTagIds = null;
+			tagIndex += CollectionUtils.size(serviceTags.getTags());
 
-		resourceIdToTagIds = new HashMap<Long, List<Long>>();
-		List<Long> tagList = new ArrayList<Long>();
+		}
 
+		// Remove duplicate tag definitions
+		for (Map.Entry<String, ServiceTags> serviceTagsMapEntry : ret.entrySet()){
 
-		if (MapUtils.isNotEmpty(tags)) {
-			resourceIdToTagIds = new HashMap<Long, List<Long>>();
+			Map<Long, RangerTagDef> allTagDefs = serviceTagsMapEntry.getValue().getTagDefinitions();
 
-			for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) {
-				tagList.add(entry.getKey());
+			Map<String, String> tagTypeIndex = new HashMap<String, String>();
+			Map<Long, RangerTagDef> uniqueTagDefs = new HashMap<Long, RangerTagDef>();
+
+			for (Map.Entry<Long, RangerTagDef> entry : allTagDefs.entrySet()) {
+				String tagTypeName = entry.getValue().getName();
+
+				if (tagTypeIndex.get(tagTypeName) == null) {
+					tagTypeIndex.put(tagTypeName, tagTypeName);
+					uniqueTagDefs.put(entry.getKey(), entry.getValue());
+				}
 			}
+			serviceTagsMapEntry.getValue().setTagDefinitions(uniqueTagDefs);
 		}
 
-		resourceIdToTagIds.put(1L, tagList);
+		return ret;
+	}
+
+	static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, long index, long tagIndex, Map<String, ServiceTags> serviceTagsMap) throws Exception {
 
+		ServiceTags ret = null;
 
-		ret = new ServiceTags();
+		IReferenceableInstance entity = entityWithTraits.getEntity();
 
-		ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
-		ret.setTagModel(ServiceTags.TAGMODEL_RESOURCE_PRIVATE);
-		ret.setServiceName(serviceResource.getServiceName());
-		ret.setServiceResources(serviceResources);
-		ret.setTagDefinitions(tagDefs);
-		ret.setTags(tags);
-		ret.setResourceToTagIds(resourceIdToTagIds);
+		RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity);
 
-		return ret;
-	}
+		if (serviceResource != null) {
 
+			serviceResource.setId(index);
 
-	static private RangerServiceResource getServiceResource(IReferenceableInstance entity) throws Exception {
+			String serviceName = serviceResource.getServiceName();
 
-		RangerServiceResource ret = null;
+			Map<Long, RangerTag> tags = getTags(entityWithTraits, tagIndex);
 
-		Map<String, RangerPolicy.RangerPolicyResource> elements = null;
-		String serviceName = null;
+			Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
 
+			Map<Long, List<Long>> resourceIdToTagIds = null;
 
-		elements = new HashMap<String, RangerPolicy.RangerPolicyResource>();
+			resourceIdToTagIds = new HashMap<Long, List<Long>>();
+			List<Long> tagList = new ArrayList<Long>();
+
+			if (MapUtils.isNotEmpty(tags)) {
+				resourceIdToTagIds = new HashMap<Long, List<Long>>();
 
-		List<String> components = getQualifiedNameComponents(entity);
-		// components should contain qualifiedName, clusterName, dbName, tableName, columnName in that order
+				for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) {
+					tagList.add(entry.getKey());
+				}
+			}
 
-		String entityTypeName = entity.getTypeName();
+			resourceIdToTagIds.put(index, tagList);
 
-		String qualifiedName = components.get(0);
+			ret = createOrGetServiceTags(serviceTagsMap, serviceName);
 
-		String clusterName, dbName, tableName, columnName;
+			ret.getServiceResources().add(serviceResource);
+			ret.getTagDefinitions().putAll(tagDefs);
+			ret.getTags().putAll(tags);
+			ret.getResourceToTagIds().putAll(resourceIdToTagIds);
 
-		if (components.size() > 1) {
-			clusterName = components.get(1);
-			serviceName = getServiceName(clusterName, entityTypeName);
+		} else {
+			LOG.error("AtlasResourceMapper not found for entity-type:" + entity.getTypeName());
 		}
 
-		if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) {
-			if (components.size() > 2) {
-				dbName = components.get(2);
-				RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-				elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource);
+		return ret;
+	}
+
+	static private ServiceTags createOrGetServiceTags(Map<String, ServiceTags> serviceTagsMap, String serviceName) {
+		ServiceTags ret = serviceTagsMap == null ? null : serviceTagsMap.get(serviceName);
 
-			} else {
-				LOG.error("invalid qualifiedName for HIVE_DB, qualifiedName=" + qualifiedName);
-			}
-		} else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE)) {
-			if (components.size() > 3) {
-				dbName = components.get(2);
-				tableName = components.get(3);
-				RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-				elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource);
-				RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
-				elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource);
-			} else {
-				LOG.error("invalid qualifiedName for HIVE_TABLE, qualifiedName=" + qualifiedName);
-			}
-		} else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) {
-			if (components.size() > 4) {
-				dbName = components.get(2);
-				tableName = components.get(3);
-				columnName = components.get(4);
-				RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-				elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource);
-				RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
-				elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource);
-				RangerPolicy.RangerPolicyResource columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName);
-				elements.put(RANGER_TYPE_HIVE_COLUMN, columnPolicyResource);
-			} else {
-				LOG.error("invalid qualifiedName for HIVE_COLUMN, qualifiedName=" + qualifiedName);
+		if (ret == null) {
+			ret = new ServiceTags();
+
+			if (serviceTagsMap != null) {
+				serviceTagsMap.put(serviceName, ret);
 			}
 
+			ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+			ret.setTagModel(ServiceTags.TAGMODEL_RESOURCE_PRIVATE);
+			ret.setServiceName(serviceName);
 		}
 
-
-		ret = new RangerServiceResource();
-		ret.setGuid(entity.getId()._getId());
-		ret.setId(1L);
-		ret.setServiceName(serviceName);
-		ret.setResourceElements(elements);
-
 		return ret;
 	}
 
-	static private Map<Long, RangerTag> getTags(EntityNotification entityNotification) {
+	static private Map<Long, RangerTag> getTags(AtlasEntityWithTraits entityWithTraits, long index) {
 		Map<Long, RangerTag> ret = null;
 
 		ret = new HashMap<Long, RangerTag>();
 
-		long index = 1;
-
-		List<IStruct> traits = entityNotification.getAllTraits();
+		List<IStruct> traits = entityWithTraits.getAllTraits();
 
 		for (IStruct trait : traits) {
 
@@ -288,113 +248,18 @@ public class AtlasNotificationMapper {
 
 		if (MapUtils.isNotEmpty(tags)) {
 			ret = new HashMap<Long, RangerTagDef>();
+
 			for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) {
+
 				RangerTagDef tagDef = new RangerTagDef();
 				tagDef.setName(entry.getValue().getType());
 				tagDef.setId(entry.getKey());
 				ret.put(entry.getKey(), tagDef);
-			}
-		}
-
-		return ret;
-	}
-
-	static private String getQualifiedNameAttributeName(String entityTypeName) {
-		String ret = StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) ?
-				ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE : ENTITY_ATTRIBUTE_QUALIFIED_NAME;
-
-		return ret;
-	}
-
-	static private List<String> getQualifiedNameComponents(IReferenceableInstance entity) throws Exception {
 
-		List<String> ret = null;
-
-		String qualifiedNameAttributeName = getQualifiedNameAttributeName(entity.getTypeName());
-
-		String qualifiedName = getEntityAttribute(entity, qualifiedNameAttributeName, String.class);
-
-		ret = getQualifiedNameComponents(entity.getTypeName(), qualifiedName);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("----- Entity-Id:" + entity.getId()._getId());
-			LOG.debug("----- Entity-Type-Name:" + entity.getTypeName());
-			LOG.debug("----- 	Entity-Components -----");
-			int i = 0;
-			for (String value : ret) {
-				LOG.debug("-----		Index:" + i++ + "	Value:" + value);
 			}
 		}
-		return ret;
-	}
-
-	static public List<String> getQualifiedNameComponents(String entityTypeName, String qualifiedName) throws Exception {
-
-		List<String> ret = null;
-
-		String qualifiedNameAttributeName = getQualifiedNameAttributeName(entityTypeName);
-
-		if (StringUtils.isBlank(qualifiedName)) {
-			throw new Exception("Could not get a valid value for " + qualifiedNameAttributeName + " attribute from entity notification.");
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Received .... " + qualifiedNameAttributeName + "=" + qualifiedName + " for entity type " + entityTypeName);
-		}
-
-		String components[] = qualifiedName.split(QUALIFIED_NAME_FORMAT_CLUSTER_DELIMITER_STRING);
-
-		if (components == null || components.length != 2) {
-			throw new Exception("Qualified Name does not contain cluster-name, qualifiedName=" + qualifiedName);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("name-hierarchy=" + components[0] + ", cluster-name=" + components[1]);
-		}
-
-		String nameHierarchy[] = components[0].split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING);
-
-		int hierarchyLevels = nameHierarchy.length;
-
-		ret = new ArrayList<String>();
-
-		ret.add(qualifiedName);
-		ret.add(components[1]);
-
-		for (int i = 0; i < hierarchyLevels; i++) {
-			ret.add(nameHierarchy[i]);
-		}
 
 		return ret;
 	}
 
-	static private String getServiceName(String clusterName, String entityTypeName) {
-		// Parse entityTypeName to get the Apache-component Name
-		// Assumption: entityTypeName is <componentName>_<component_specific_type_name>
-		// such as hive_table, hadoop_path, hbase_queue, etc.
-		String apacheComponents[] = entityTypeName.split("_");
-		String apacheComponent = null;
-		if (apacheComponents.length > 0) {
-			apacheComponent = apacheComponents[0].toLowerCase();
-		}
-
-		return TagSyncConfig.getServiceName(apacheComponent, clusterName, properties);
-	}
-
-	static private <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) {
-		T ret = null;
-
-		try {
-			Map<String, Object> valueMap = entity.getValuesMap();
-			ret = getAttribute(valueMap, name, type);
-		} catch (AtlasException exception) {
-			LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception);
-		}
-
-		return ret;
-	}
-	static private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
-		return type.cast(map.get(name));
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
deleted file mode 100644
index 42ba7c7..0000000
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import com.google.gson.Gson;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Provider;
-
-import org.apache.atlas.AtlasException;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-
-import org.apache.ranger.tagsync.model.TagSink;
-import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.plugin.util.ServiceTags;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-
-public class TagAtlasSource implements TagSource {
-	private static final Log LOG = LogFactory.getLog(TagAtlasSource.class);
-
-	public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties";
-
-	public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers";
-	public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
-	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
-
-	private TagSink tagSink;
-	private Properties properties;
-	private ConsumerRunnable consumerTask;
-
-	@Override
-	public boolean initialize(Properties properties) {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagAtlasSource.initialize()");
-		}
-
-		boolean ret = true;
-
-		if (properties == null || MapUtils.isEmpty(properties)) {
-			LOG.error("No properties specified for TagFileSource initialization");
-			this.properties = new Properties();
-		} else {
-			this.properties = properties;
-		}
-
-		Properties atlasProperties = new Properties();
-
-		InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
-
-		if (inputStream != null) {
-			try {
-				atlasProperties.load(inputStream);
-			} catch (Exception exception) {
-				ret = false;
-				LOG.error("Cannot load Atlas application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, exception);
-			} finally {
-				try {
-					inputStream.close();
-				} catch (IOException ioException) {
-					LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException);
-				}
-			}
-		} else {
-			ret = false;
-			LOG.error("Cannot find Atlas application properties file");
-		}
-
-		if (ret) {
-			if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS))) {
-				ret = false;
-				LOG.error("Value of property '" + TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
-			}
-			if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT))) {
-				ret = false;
-				LOG.error("Value of property '" + TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
-			}
-			if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP))) {
-				ret = false;
-				LOG.error("Value of property '" + TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
-			}
-		}
-
-		if (ret) {
-			NotificationModule notificationModule = new NotificationModule();
-
-			Injector injector = Guice.createInjector(notificationModule);
-
-			Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class);
-			NotificationInterface notification = consumerProvider.get();
-			List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
-
-			consumerTask = new ConsumerRunnable(iterators.get(0));
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagAtlasSource.initialize(), result=" + ret);
-		}
-		return ret;
-	}
-
-	@Override
-	public void setTagSink(TagSink sink) {
-		if (sink == null) {
-			LOG.error("Sink is null!!!");
-		} else {
-			this.tagSink = sink;
-		}
-	}
-
-	@Override
-	public Thread start() {
-		Thread consumerThread = null;
-		if (consumerTask == null) {
-			LOG.error("No consumerTask!!!");
-		} else {
-			consumerThread = new Thread(consumerTask);
-			consumerThread.setDaemon(true);
-			consumerThread.start();
-		}
-		return consumerThread;
-	}
-
-	@Override
-	public void updateSink() throws Exception {
-	}
-
-	@Override
-	public 	boolean isChanged() {
-		return true;
-	}
-
-	// ----- inner class : ConsumerRunnable ------------------------------------
-
-	private class ConsumerRunnable implements Runnable {
-
-		private final NotificationConsumer<EntityNotification> consumerIterator;
-
-		private ConsumerRunnable(NotificationConsumer<EntityNotification> consumerIterator) {
-			this.consumerIterator = consumerIterator;
-		}
-
-		// ----- Runnable --------------------------------------------------------
-
-		@Override
-		public void run() {
-			while (consumerIterator.hasNext()) {
-				try {
-					EntityNotification notification = consumerIterator.next();
-					if (notification != null) {
-						printNotification(notification);
-						ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties);
-						if (serviceTags == null) {
-							if(LOG.isDebugEnabled()) {
-								LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType());
-							}
-						} else {
-							if (LOG.isDebugEnabled()) {
-								String serviceTagsJSON = new Gson().toJson(serviceTags);
-								LOG.debug("Atlas notification mapped to serviceTags=" + serviceTagsJSON);
-							}
-
-							try {
-								tagSink.uploadServiceTags(serviceTags);
-							} catch (Exception exception) {
-								LOG.error("uploadServiceTags() failed..", exception);
-							}
-						}
-					}
-				} catch(Exception e){
-					LOG.error("Exception encountered when processing notification:", e);
-				}
-			}
-		}
-
-		public void printNotification(EntityNotification notification) {
-			IReferenceableInstance entity = notification.getEntity();
-			if (LOG.isDebugEnabled()) {
-				try {
-					LOG.debug("Notification-Type: " + notification.getOperationType());
-					LOG.debug("Entity-Id: " + entity.getId()._getId());
-					LOG.debug("Entity-Type: " + entity.getTypeName());
-
-					LOG.debug("----------- Entity Values ----------");
-
-
-					for (Map.Entry<String, Object> entry : entity.getValuesMap().entrySet()) {
-						LOG.debug("		Name:" + entry.getKey());
-						Object value = entry.getValue();
-						LOG.debug("		Value:" + value);
-					}
-
-					LOG.debug("----------- Entity Traits ----------");
-
-					List<IStruct> traits = notification.getAllTraits();
-
-					for (IStruct trait : traits) {
-						LOG.debug("			Trait-Type-Name:" + trait.getTypeName());
-						Map<String, Object> traitValues = trait.getValuesMap();
-						for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
-							LOG.debug("				Trait-Value-Name:" + valueEntry.getKey());
-							LOG.debug("				Trait-Value:" + valueEntry.getValue());
-						}
-					}
-				} catch (AtlasException exception) {
-					LOG.error("Cannot print notification - ", exception);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
new file mode 100644
index 0000000..c8ed948
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlasrest;
+
+import com.google.gson.Gson;
+
+import com.google.gson.GsonBuilder;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
+import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
+import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
+
+import java.util.*;
+
+public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
+	private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class);
+
+	private String atlasEndpoint;
+	private long sleepTimeBetweenCycleInMillis;
+
+	@Override
+	public boolean initialize(Properties properties) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> AtlasRESTTagSource.initialize()");
+		}
+
+		boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
+
+		atlasEndpoint = TagSyncConfig.getAtlasEndpoint(properties);
+		sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
+
+		if (StringUtils.isEmpty(atlasEndpoint)) {
+			LOG.info("No AtlasEndpoint specified, Initial download of Atlas-entities cannot be done.");
+			ret = false;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== AtlasRESTTagSource.initialize(), result=" + ret);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public boolean start() {
+
+		Thread atlasRESTInvokerThread = new Thread(this);
+		atlasRESTInvokerThread.setDaemon(true);
+		atlasRESTInvokerThread.start();
+
+		return atlasRESTInvokerThread != null;
+	}
+
+	@Override
+	public void run() {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> AtlasRESTTagSource.run()");
+		}
+
+		while (!shutdown) {
+
+			synchUp();
+
+			LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
+
+			try {
+
+				Thread.sleep(sleepTimeBetweenCycleInMillis);
+
+			} catch (InterruptedException exception) {
+				LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", exception);
+			}
+		}
+		LOG.info("Shutting down the Tag-Atlasrest-source thread");
+	}
+
+	@Override
+	public boolean isChanged() {
+		return true;
+	}
+
+	@Override
+	public void synchUp() {
+
+		AtlasRESTUtil atlasRESTUtil = new AtlasRESTUtil(atlasEndpoint);
+
+		List<AtlasEntityWithTraits> atlasEntitiesWithTraits = atlasRESTUtil.getEntitiesWithTraits();
+
+		if (CollectionUtils.isNotEmpty(atlasEntitiesWithTraits)) {
+			if (LOG.isDebugEnabled()) {
+				for (AtlasEntityWithTraits element : atlasEntitiesWithTraits) {
+					LOG.debug(element);
+				}
+			}
+
+			Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processEntitiesWithTraits(atlasEntitiesWithTraits);
+
+			if (MapUtils.isNotEmpty(serviceTagsMap)) {
+				for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) {
+					if (LOG.isDebugEnabled()) {
+						Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z")
+								.setPrettyPrinting()
+								.create();
+						String serviceTagsString = gsonBuilder.toJson(entry.getValue());
+
+						LOG.debug("serviceTags=" + serviceTagsString);
+					}
+					updateSink(entry.getValue());
+				}
+			}
+		}
+
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
new file mode 100644
index 0000000..7f4676a
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlasrest;
+
+import com.google.gson.Gson;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
+import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
+
+import java.util.*;
+
+@SuppressWarnings("unchecked")
+public class AtlasRESTUtil {
+	private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class);
+
+	private static final String REST_MIME_TYPE_JSON = "application/json" ;
+	private static final String API_ATLAS_TYPES    = "api/atlas/types";
+	private static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
+	private static final String API_ATLAS_ENTITY   = "api/atlas/entities/";
+	private static final String API_ATLAS_TYPE     = "api/atlas/types/";
+
+	private static final String RESULTS_ATTRIBUTE               = "results";
+	private static final String DEFINITION_ATTRIBUTE            = "definition";
+	private static final String VALUES_ATTRIBUTE                = "values";
+	private static final String TRAITS_ATTRIBUTE                = "traits";
+	private static final String TYPE_NAME_ATTRIBUTE             = "typeName";
+	private static final String TRAIT_TYPES_ATTRIBUTE           = "traitTypes";
+	private static final String SUPER_TYPES_ATTRIBUTE           = "superTypes";
+	private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
+	private static final String NAME_ATTRIBUTE                  = "name";
+
+	private final Gson gson = new Gson();
+
+	private RangerRESTClient atlasRESTClient;
+
+	public AtlasRESTUtil(String atlasEndpoint) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> AtlasRESTUtil(" + atlasEndpoint + ")");
+		}
+
+		if (!atlasEndpoint.endsWith("/")) {
+			atlasEndpoint += "/";
+		}
+
+		// This uses RangerRESTClient to invoke REST APIs on Atlas. It will work only if scheme of URL is http
+		atlasRESTClient = new RangerRESTClient();
+		atlasRESTClient.setUrl(atlasEndpoint);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== AtlasRESTUtil(" + atlasEndpoint + ")");
+		}
+	}
+
+	public List<AtlasEntityWithTraits> getEntitiesWithTraits() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> getEntriesWithTraits()");
+		}
+
+		List<AtlasEntityWithTraits> ret = new ArrayList<AtlasEntityWithTraits>();
+
+		Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
+
+		List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
+
+		if (CollectionUtils.isNotEmpty(types)) {
+
+			for (String type : types) {
+
+				if (!AtlasResourceMapperUtil.isEntityTypeHandled(type)) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Not fetching Atlas entities of type:" + type);
+					}
+					continue;
+				}
+
+				Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
+
+				List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
+
+				if (CollectionUtils.isEmpty(guids)) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("No Atlas entities for type:" + type);
+					}
+					continue;
+				}
+
+				for (String guid : guids) {
+
+					Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
+
+					if (MapUtils.isNotEmpty(entityResponse) && entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+
+						Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
+						Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+
+						if (MapUtils.isNotEmpty(traitsAttribute)) {
+
+							List<IStruct> allTraits = new LinkedList<>();
+
+							for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) {
+
+								Map<String, Object> trait = (Map<String, Object>) entry.getValue();
+
+								Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+								String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+
+								List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+
+								Struct trait1 = new Struct(traitTypeName, traitValues);
+
+								allTraits.add(trait1);
+								allTraits.addAll(superTypes);
+							}
+
+							IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
+
+							if (entity != null) {
+								AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits);
+								ret.add(entityWithTraits);
+							} else {
+								if (LOG.isInfoEnabled()) {
+									LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid);
+								}
+							}
+
+						}
+					}
+				}
+			}
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("<== getEntriesWithTraits()");
+			}
+		}
+
+		return ret;
+	}
+
+	private Map<String, Object> getTraitType(String traitName) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> getTraitType(" + traitName + ")");
+		}
+		Map<String, Object> ret = null;
+
+		Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
+
+		if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+
+			Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
+
+			List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
+
+			if (traitTypes.size() > 0) {
+				ret = (Map<String, Object>) traitTypes.get(0);
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== getTraitType(" + traitName + ")");
+		}
+		return ret;
+	}
+
+	private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values) {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> getTraitSuperTypes()");
+		}
+		List<IStruct> ret = new LinkedList<>();
+
+		if (traitType != null) {
+
+			List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
+
+			for (String superTypeName : superTypeNames) {
+
+				Map<String, Object> superTraitType = getTraitType(superTypeName);
+
+				if (superTraitType != null) {
+					List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+
+					Map<String, Object> superTypeValues = new HashMap<>();
+					for (Map<String, Object> attributeDefinition : attributeDefinitions) {
+
+						String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
+						if (values.containsKey(attributeName)) {
+							superTypeValues.put(attributeName, values.get(attributeName));
+						}
+					}
+
+					List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values);
+
+					Struct superTrait = new Struct(superTypeName, superTypeValues);
+
+					ret.add(superTrait);
+					ret.addAll(superTraits);
+				}
+			}
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== getTraitSuperTypes()");
+		}
+		return ret;
+	}
+
+	private Map<String, Object> atlasAPI(String endpoint) {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> atlasAPI(" + endpoint + ")");
+		}
+		Map<String, Object> ret = new HashMap<String, Object>();
+
+		try {
+			WebResource webResource = atlasRESTClient.getResource(endpoint);
+			ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class);
+
+			if (response != null && response.getStatus() == 200) {
+				ret = response.getEntity(ret.getClass());
+			} else {
+				RESTResponse resp = RESTResponse.fromClientResponse(response);
+				LOG.error("Error getting atlas data request=" + webResource.toString()
+						+ ", response=" + resp.toString());
+			}
+		} catch (Exception exception) {
+			LOG.error("Exception when fetching Atlas objects.", exception);
+			ret = null;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== atlasAPI(" + endpoint + ")");
+		}
+		return ret;
+	}
+
+	private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
+		return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
new file mode 100644
index 0000000..43eb3b5
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.file;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.*;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Date;
+import java.util.Properties;
+
+public class FileTagSource extends AbstractTagSource implements Runnable {
+	private static final Log LOG = LogFactory.getLog(FileTagSource.class);
+
+	private String serviceTagsFileName;
+	private URL serviceTagsFileURL;
+	private long lastModifiedTimeInMillis = -1L;
+
+	private Gson gsonBuilder;
+	private Properties properties;
+	private long fileModTimeCheckIntervalInMs;
+
+	@Override
+	public boolean initialize(Properties props) {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> FileTagSource.initialize()");
+		}
+
+		if (props == null || MapUtils.isEmpty(props)) {
+			LOG.error("No properties specified for FileTagSource initialization");
+			this.properties = new Properties();
+		} else {
+			this.properties = props;
+		}
+
+		gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+
+		boolean ret = true;
+
+		serviceTagsFileName = TagSyncConfig.getTagSourceFileName(properties);
+
+		if (StringUtils.isBlank(serviceTagsFileName)) {
+			ret = false;
+			LOG.error("value of property 'ranger.tagsync.source.impl.class' is file and no value specified for property 'ranger.tagsync.filesource.filename'!");
+		}
+
+		if (ret) {
+
+			fileModTimeCheckIntervalInMs = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Provided serviceTagsFileName=" + serviceTagsFileName);
+				LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + fileModTimeCheckIntervalInMs + "ms");
+			}
+
+			InputStream serviceTagsFileStream = null;
+
+			File f = new File(serviceTagsFileName);
+
+			if (f.exists() && f.isFile() && f.canRead()) {
+				try {
+					serviceTagsFileStream = new FileInputStream(f);
+					serviceTagsFileURL = f.toURI().toURL();
+				} catch (FileNotFoundException exception) {
+					LOG.error("Error processing input file:" + serviceTagsFileName + " or no privilege for reading file " + serviceTagsFileName, exception);
+				} catch (MalformedURLException malformedException) {
+					LOG.error("Error processing input file:" + serviceTagsFileName + " cannot be converted to URL " + serviceTagsFileName, malformedException);
+				}
+			} else {
+
+				URL fileURL = getClass().getResource(serviceTagsFileName);
+				if (fileURL == null) {
+					if (!serviceTagsFileName.startsWith("/")) {
+						fileURL = getClass().getResource("/" + serviceTagsFileName);
+					}
+				}
+
+				if (fileURL == null) {
+					fileURL = ClassLoader.getSystemClassLoader().getResource(serviceTagsFileName);
+					if (fileURL == null) {
+						if (!serviceTagsFileName.startsWith("/")) {
+							fileURL = ClassLoader.getSystemClassLoader().getResource("/" + serviceTagsFileName);
+						}
+					}
+				}
+
+				if (fileURL != null) {
+
+					try {
+						serviceTagsFileStream = fileURL.openStream();
+						serviceTagsFileURL = fileURL;
+					} catch (Exception exception) {
+						LOG.error(serviceTagsFileName + " is not a file", exception);
+					}
+				} else {
+					LOG.warn("Error processing input file: URL not found for " + serviceTagsFileName + " or no privilege for reading file " + serviceTagsFileName);
+				}
+			}
+
+			if (serviceTagsFileStream != null) {
+				try {
+					serviceTagsFileStream.close();
+				} catch (Exception e) {
+					// Ignore
+				}
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== FileTagSource.initialize(): sourceFileName=" + serviceTagsFileName + ", result=" + ret);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public boolean start() {
+
+		Thread fileMonitoringThread = new Thread(this);
+		fileMonitoringThread.setDaemon(true);
+		fileMonitoringThread.start();
+
+		return fileMonitoringThread != null;
+	}
+
+	@Override
+	public void run() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> FileTagSource.run()");
+		}
+
+		while (!shutdown) {
+
+			try {
+				synchUp();
+
+				LOG.debug("Sleeping for [" + fileModTimeCheckIntervalInMs + "] milliSeconds");
+
+				Thread.sleep(fileModTimeCheckIntervalInMs);
+			}
+			catch (InterruptedException e) {
+				LOG.error("Failed to wait for [" + fileModTimeCheckIntervalInMs + "] milliseconds before checking for update to tagFileSource", e);
+			}
+			catch (Throwable t) {
+				LOG.error("tag-sync thread got an error", t);
+			}
+		}
+
+		LOG.info("Shutting down the Tag-file-source thread");
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== FileTagSource.run()");
+		}
+	}
+
+	@Override
+	public 	boolean isChanged() {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> FileTagSource.isChanged()");
+		}
+		boolean ret = false;
+
+		long modificationTime = getModificationTime();
+
+		if (modificationTime > lastModifiedTimeInMillis) {
+			if (LOG.isDebugEnabled()) {
+				Date modifiedDate = new Date(modificationTime);
+				Date lastModifiedDate = new Date(lastModifiedTimeInMillis);
+				LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate);
+			}
+			lastModifiedTimeInMillis = modificationTime;
+			ret = true;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== FileTagSource.isChanged(): result=" + ret);
+		}
+		return ret;
+	}
+
+	@Override
+	public void synchUp() {
+		if (isChanged()) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Begin: update tags from source==>sink");
+			}
+
+			ServiceTags serviceTags = readFromFile();
+			updateSink(serviceTags);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("End: update tags from source==>sink");
+			}
+
+		} else {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("FileTagSource: no change found for synchronization.");
+			}
+		}
+	}
+	private ServiceTags readFromFile() {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> FileTagSource.readFromFile(): sourceFileName=" + serviceTagsFileName);
+		}
+
+		ServiceTags ret = null;
+
+		if (serviceTagsFileURL != null) {
+			try (
+					InputStream serviceTagsFileStream = serviceTagsFileURL.openStream();
+					Reader reader = new InputStreamReader(serviceTagsFileStream, Charset.forName("UTF-8"))
+			) {
+
+				ret = gsonBuilder.fromJson(reader, ServiceTags.class);
+
+			} catch (IOException e) {
+				LOG.warn("Error processing input file: or no privilege for reading file " + serviceTagsFileName, e);
+			}
+		} else {
+			LOG.error("Error reading file: " + serviceTagsFileName);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== FileTagSource.readFromFile(): sourceFileName=" + serviceTagsFileName);
+		}
+
+		return ret;
+	}
+
+	private long getModificationTime() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> FileTagSource.getLastModificationTime(): sourceFileName=" + serviceTagsFileName);
+		}
+		long ret = 0L;
+
+		File sourceFile = new File(serviceTagsFileName);
+
+		if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) {
+			ret = sourceFile.lastModified();
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== FileTagSource.lastModificationTime(): serviceTagsFileName=" + serviceTagsFileName + " result=" + new Date(ret));
+		}
+
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
deleted file mode 100644
index 92f24b2..0000000
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.file;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.tagsync.model.TagSink;
-import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.io.*;
-import java.util.Date;
-import java.util.Properties;
-
-public class TagFileSource implements TagSource, Runnable {
-	private static final Log LOG = LogFactory.getLog(TagFileSource.class);
-
-	private String sourceFileName;
-	private long lastModifiedTimeInMillis = 0L;
-
-	private Gson gson;
-	private TagSink tagSink;
-	private Properties properties;
-
-	@Override
-	public boolean initialize(Properties props) {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagFileSource.initialize()");
-		}
-
-		if (props == null || MapUtils.isEmpty(props)) {
-			LOG.error("No properties specified for TagFileSource initialization");
-			this.properties = new Properties();
-		} else {
-			this.properties = props;
-		}
-
-		boolean ret = true;
-
-		if (StringUtils.isBlank(TagSyncConfig.getTagSourceFileName(properties))) {
-			ret = false;
-			LOG.error("value of property 'ranger.tagsync.source.impl.class' is file and no value specified for property 'ranger.tagsync.filesource.filename'!");
-		}
-
-		if (ret) {
-
-			long fileModTimeCheckIntervalInMs = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
-
-			if (fileModTimeCheckIntervalInMs <= 0L) {
-				LOG.info("'ranger.tagsync.filesource.modtime.check.interval' is zero or negative! 'ranger.tagsync.filesource.modtime.check.interval'=" + fileModTimeCheckIntervalInMs + "ms");
-				LOG.info("Setting 'ranger.tagsync.filesource.modtime.check.interval' to 60 seconds");
-				fileModTimeCheckIntervalInMs = 60*1000;
-			} else {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + fileModTimeCheckIntervalInMs + "ms");
-				}
-			}
-			sourceFileName = TagSyncConfig.getTagSourceFileName(properties);
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Provided sourceFileName=" + sourceFileName);
-			}
-
-			String realFileName = TagSyncConfig.getResourceFileName(sourceFileName);
-			if (realFileName != null) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Real sourceFileName=" + realFileName);
-				}
-				sourceFileName = realFileName;
-			} else {
-				LOG.error(sourceFileName + " is not a file or is not readable");
-				ret = false;
-			}
-		}
-
-		if (ret) {
-			try {
-				gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
-			} catch (Throwable excp) {
-				LOG.fatal("failed to create GsonBuilder object", excp);
-				ret = false;
-			}
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagFileSource.initialize(): sourceFileName=" + sourceFileName + ", result=" + ret);
-		}
-
-		return ret;
-	}
-
-	@Override
-	public void setTagSink(TagSink sink) {
-		if (sink == null) {
-			LOG.error("Sink is null!!!");
-		} else {
-			this.tagSink = sink;
-		}
-	}
-
-	@Override
-	public Thread start() {
-
-		Thread fileMonitoringThread = null;
-
-		fileMonitoringThread = new Thread(this);
-		fileMonitoringThread.setDaemon(true);
-		fileMonitoringThread.start();
-
-		return fileMonitoringThread;
-	}
-
-	@Override
-	public void run() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagFileSource.run()");
-		}
-		long sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
-		boolean shutdownFlag = false;
-
-		while (!shutdownFlag) {
-
-			try {
-				if (isChanged()) {
-					LOG.info("Begin: update tags from source==>sink");
-					if (TagSyncConfig.isTagSyncEnabled(properties)) {
-						updateSink();
-						LOG.info("End: update tags from source==>sink");
-					} else {
-						LOG.info("Tag-sync is not enabled.");
-					}
-				} else {
-					LOG.debug("TagFileSource: no change found for synchronization.");
-				}
-
-				LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
-
-				Thread.sleep(sleepTimeBetweenCycleInMillis);
-			}
-			catch (InterruptedException e) {
-				LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", e);
-				shutdownFlag = true;
-			}
-			catch (Throwable t) {
-				LOG.error("tag-sync thread got an error", t);
-			}
-		}
-
-		LOG.info("Shutting down the Tag-file-source thread");
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagFileSource.run()");
-		}
-	}
-
-	@Override
-	public void updateSink() throws Exception {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagFileSource.updateSink()");
-		}
-		ServiceTags serviceTags = readFromFile();
-
-		if (serviceTags != null) {
-			tagSink.uploadServiceTags(serviceTags);
-		} else {
-			LOG.error("Could not read ServiceTags from file");
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagFileSource.updateSink()");
-		}
-	}
-
-	@Override
-	public 	boolean isChanged() {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagFileSource.isChanged()");
-		}
-		boolean ret = false;
-
-		long modificationTime = getModificationTime();
-
-		if (modificationTime > lastModifiedTimeInMillis) {
-			if (LOG.isDebugEnabled()) {
-				Date modifiedDate = new Date(modificationTime);
-				Date lastModifiedDate = new Date(lastModifiedTimeInMillis);
-				LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate);
-			}
-			lastModifiedTimeInMillis = modificationTime;
-			ret = true;
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagFileSource.isChanged(): result=" + ret);
-		}
-		return ret;
-	}
-
-	private ServiceTags readFromFile() {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagFileSource.readFromFile(): sourceFileName=" + sourceFileName);
-		}
-
-		ServiceTags ret = null;
-
-		Reader reader = null;
-		try {
-
-			reader = new InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName));
-
-			ret = gson.fromJson(reader, ServiceTags.class);
-
-		}
-		catch (FileNotFoundException exception) {
-			LOG.warn("Tag-source file does not exist or not readble '" + sourceFileName + "'");
-		}
-		catch (Exception excp) {
-			LOG.error("failed to load service-tags from Tag-source file " + sourceFileName, excp);
-		}
-		finally {
-			if (reader != null) {
-				try {
-					reader.close();
-				} catch (Exception excp) {
-					LOG.error("error while closing opened Tag-source file " + sourceFileName, excp);
-				}
-			}
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagFileSource.readFromFile(): sourceFileName=" + sourceFileName);
-		}
-
-		return ret;
-	}
-
-	private long getModificationTime() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagFileSource.getLastModificationTime(): sourceFileName=" + sourceFileName);
-		}
-		long ret = 0L;
-
-		File sourceFile = new File(sourceFileName);
-
-		if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) {
-			ret = sourceFile.lastModified();
-		} else {
-			ret = new Date().getTime();
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagFileSource.lastModificationTime(): sourceFileName=" + sourceFileName + " result=" + new Date(ret));
-		}
-
-		return ret;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/resources/etc/ranger/data/tags.json
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/etc/ranger/data/tags.json b/tagsync/src/main/resources/etc/ranger/data/tags.json
index 274cf69..b4cd736 100644
--- a/tagsync/src/main/resources/etc/ranger/data/tags.json
+++ b/tagsync/src/main/resources/etc/ranger/data/tags.json
@@ -1,9 +1,7 @@
 {
     "op":"add_or_update",
-    "tagModel":"shared",
+    "tagModel": "resource_private",
     "serviceName": "cl1_hive",
-    "tagVersion": 24,
-    "tagUpdateTime": "20150901-20:03:17.000-+0000",
     "tagDefinitions": {
       "1": {
         "name": "EXPIRES_ON",
@@ -15,12 +13,7 @@
           }
         ],
         "id": 1,
-        "guid": "1441137512654_323_77",
-        "isEnabled": true,
-        "createdBy": "Admin",
-        "updatedBy": "Admin",
-        "createTime": "20150901-19:58:33.000-+0000",
-        "updateTime": "20150901-19:58:33.000-+0000"
+        "guid": "tagdefinition-1-guid"
       }
     },
     "tags": {
@@ -30,12 +23,7 @@
           "expiry_date": "2014/12/31"
         },
         "id": 1,
-        "guid": "1441137512698_844_80",
-        "isEnabled": true,
-        "createdBy": "Admin",
-        "updatedBy": "Admin",
-        "createTime": "20150901-19:58:33.000-+0000",
-        "updateTime": "20150901-20:03:17.000-+0000"
+        "guid": "tag-1-guid"
       }
     },
     "serviceResources": [
@@ -57,14 +45,8 @@
             "isRecursive": false
           }
         },
-        "resourceSignature": "c1114679b35a65d28e0dca4fdffc27d6",
         "id": 1,
-        "guid": "1441137512756_887_83",
-        "isEnabled": true,
-        "createdBy": "Admin",
-        "updatedBy": "Admin",
-        "createTime": "20150901-19:58:33.000-+0000",
-        "updateTime": "20150901-19:58:33.000-+0000"
+        "guid": "serviceresource-1-guid"
       }
     ],
     "resourceToTagIds": {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/resources/ranger-tagsync-default.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml
index b917d29..3784df8 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -55,4 +55,9 @@
 		<value>tagadmin</value>
 		<description></description>
 	</property>
+	<property>
+		<name>ranger.tagsync.atlasrestsource.download.interval</name>
+		<value>600000</value>
+		<description></description>
+	</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
index 59d521c..43c22d0 100644
--- a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
@@ -20,12 +20,7 @@
 package org.apache.ranger.tagsync.process;
 
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-import org.apache.ranger.tagsync.process.TagSynchronizer;
-import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
-import org.apache.ranger.tagsync.source.atlas.TagAtlasSource;
+import org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,8 +29,6 @@ import java.io.*;
 import java.util.List;
 import java.util.Properties;
 
-import static org.junit.Assert.*;
-
 
 public class TestTagSynchronizer {
 
@@ -79,7 +72,7 @@ public class TestTagSynchronizer {
 	@Test
 	public void testTagDownload() {
 
-		boolean initDone = false;
+		boolean initDone = true;
 
 		/* For tagSynchronizer.initialize() to succeed, edit ranger-tagsync-site.xml file to contain correct
 		values of the following properties:
@@ -97,10 +90,12 @@ public class TestTagSynchronizer {
 		*/
 
 
-		//initDone = tagSynchronizer.initialize();
+//		initDone = tagSynchronizer.initialize(null);
 
 		System.out.println("TagSynchronizer initialization result=" + initDone);
 
+		assert(initDone);
+
 		System.out.println("Exiting testTagDownload()");
 	}
 
@@ -108,14 +103,15 @@ public class TestTagSynchronizer {
 	public void testQualifiedNames() {
 
 		List<String> components;
+		AtlasHiveResourceMapper hiveResourceBuilder = new AtlasHiveResourceMapper();
 		try {
-			components = AtlasNotificationMapper.getQualifiedNameComponents("hive_db", "database@cluster");
+			components = hiveResourceBuilder.getQualifiedNameComponents("hive_db", "database@cluster");
 			printComponents(components);
 
-			components = AtlasNotificationMapper.getQualifiedNameComponents("hive_table", "database.table@cluster");
+			components = hiveResourceBuilder.getQualifiedNameComponents("hive_table", "database.table@cluster");
 			printComponents(components);
 
-			components = AtlasNotificationMapper.getQualifiedNameComponents("hive_column", "database.table.column@cluster");
+			components = hiveResourceBuilder.getQualifiedNameComponents("hive_column", "database.table.column@cluster");
 			printComponents(components);
 
 			assert(true);


Mime
View raw message