ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [49/50] [abbrv] incubator-ranger git commit: RANGER-660: ranger-tagsync configuration validation
Date Wed, 28 Oct 2015 16:00:25 GMT
RANGER-660: ranger-tagsync configuration validation

Signed-off-by: Madhan Neethiraj <madhan@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/757d1eb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/757d1eb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/757d1eb2

Branch: refs/heads/master
Commit: 757d1eb211922be7f09cfb773d839f23ef98f147
Parents: 22859f5
Author: Abhay Kulkarni <akulkarni@hortonworks.com>
Authored: Wed Oct 21 14:54:39 2015 -0700
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Tue Oct 27 17:03:51 2015 -0700

----------------------------------------------------------------------
 .../conf/templates/installprop2xml.properties   |   2 +-
 .../conf/templates/ranger-tagsync-template.xml  |   2 +-
 tagsync/scripts/install.properties              |   3 +-
 tagsync/scripts/ranger-tagsync-services.sh      |   6 +
 tagsync/scripts/setup.py                        |   1 -
 .../apache/ranger/tagsync/model/TagSink.java    |   3 +-
 .../ranger/tagsync/process/TagSyncConfig.java   |  23 +-
 .../ranger/tagsync/process/TagSynchronizer.java | 207 +++++----
 .../tagsync/sink/tagadmin/TagRESTSink.java      | 412 ++---------------
 .../tagsync/source/atlas/AtlasUtility.java      | 404 +++++++++++++++++
 .../tagsync/source/atlas/TagAtlasSource.java    | 442 ++-----------------
 .../tagsync/source/file/TagFileSource.java      |  21 +-
 .../main/resources/ranger-tagsync-default.xml   |  18 +-
 .../src/main/resources/ranger-tagsync-site.xml  |  57 +++
 .../tagsync/process/TestTagSynchronizer.java    |  29 +-
 15 files changed, 693 insertions(+), 937 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/conf/templates/installprop2xml.properties
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/installprop2xml.properties b/tagsync/conf/templates/installprop2xml.properties
index 94618fc..5b63835 100644
--- a/tagsync/conf/templates/installprop2xml.properties
+++ b/tagsync/conf/templates/installprop2xml.properties
@@ -30,7 +30,7 @@ TAGADMIN_SSL_CONFIG_FILENAME = ranger.tagsync.tagadmin.rest.ssl.config.file
 TAGSYNC_KEYSTORE_FILENAME = ranger.tagsync.tagadmin.keystore
 
 
-SYNC_INTERVAL = ranger.tagsync.sleeptimeinmillisbetweensynccycle
+TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = ranger.tagsync.filesource.modtime.check.interval
 
 TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/conf/templates/ranger-tagsync-template.xml
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/ranger-tagsync-template.xml b/tagsync/conf/templates/ranger-tagsync-template.xml
index ebee22d..9a88681 100644
--- a/tagsync/conf/templates/ranger-tagsync-template.xml
+++ b/tagsync/conf/templates/ranger-tagsync-template.xml
@@ -28,7 +28,7 @@
 		<value></value>
 	</property>
 	<property>
-		<name>ranger.tagsync.sleeptimeinmillisbetweensynccycle</name>
+		<name>ranger.tagsync.filesource.modtime.check.interval</name>
 		<value></value>
 	</property>
 	<property>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/scripts/install.properties
----------------------------------------------------------------------
diff --git a/tagsync/scripts/install.properties b/tagsync/scripts/install.properties
index 6b36846..f7de6e3 100644
--- a/tagsync/scripts/install.properties
+++ b/tagsync/scripts/install.properties
@@ -47,8 +47,7 @@ TAGSYNC_FILESOURCE_FILENAME = /etc/ranger/data/tags.json
 
 
 # Interval for checking the source for any changes in case of TAG_SOURCE = file
-# Also used for periodicity of checking if the process needs to be shut down
-SYNC_INTERVAL = 60000
+TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 60000
 
 # Endpoint specifications needed by Atlas
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/scripts/ranger-tagsync-services.sh
----------------------------------------------------------------------
diff --git a/tagsync/scripts/ranger-tagsync-services.sh b/tagsync/scripts/ranger-tagsync-services.sh
index e818d0d..ca82ead 100755
--- a/tagsync/scripts/ranger-tagsync-services.sh
+++ b/tagsync/scripts/ranger-tagsync-services.sh
@@ -82,10 +82,16 @@ if [ "${action}" == "START" ]; then
 
 	if [ "${pid}" != "" ]
 	then
+		if [ -z "`ps axf | grep ${pid} | grep -v grep`" ]; then
+			rm -f ${pidf}
+			echo "Ranger Tagsync Service failed to start. Please refer to log files under ${logdir} for further details."
+		else
         	echo "Ranger Tagsync Service has started successfully."
+        fi
 	else
         	echo "Ranger Tagsync Service failed to start. Please refer to log files under ${logdir} for further details."
 	fi
+
 	exit;
 
 elif [ "${action}" == "STOP" ]; then

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/scripts/setup.py
----------------------------------------------------------------------
diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py
index 2721186..e4b2433 100755
--- a/tagsync/scripts/setup.py
+++ b/tagsync/scripts/setup.py
@@ -69,7 +69,6 @@ rootOwnerId = 0
 initPrefixList = ['S99', 'K00']
 
 TAG_SOURCE_KEY  = 'TAG_SOURCE'
-SYNC_INTERVAL_NEW_KEY = 'ranger.tagsync.sleeptimeinmillisbetweensynccycle'
 TAGSYNC_ATLAS_KAFKA_ENDPOINTS_KEY = 'TAGSYNC_ATLAS_KAFKA_ENDPOINTS'
 TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT_KEY = 'TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT'
 TAGSYNC_ATLAS_CONSUMER_GROUP_KEY = 'TAGSYNC_ATLAS_CONSUMER_GROUP'

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
index 6565159..9eb5319 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
@@ -19,13 +19,12 @@
 
 package org.apache.ranger.tagsync.model;
 
-import org.apache.ranger.plugin.store.TagStore;
 import org.apache.ranger.plugin.util.ServiceTags;
 
 import java.util.Properties;
 
 
-public interface TagSink extends TagStore {
+public interface TagSink {
 	boolean initialize(Properties properties);
 	void uploadServiceTags(ServiceTags serviceTags) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index bfd1b8b..e1b5130 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -45,7 +45,7 @@ public class TagSyncConfig extends Configuration {
 
 	private static final String TAGSYNC_FILESOURCE_FILENAME_PROP = "ranger.tagsync.filesource.filename";
 
-	private static final String TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP = "ranger.tagsync.sleeptimeinmillisbetweensynccycle";
+	private static final String TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP = "ranger.tagsync.filesource.modtime.check.interval";
 
 	private static final String TAGSYNC_SOURCE_CLASS_PROP = "ranger.tagsync.source.impl.class";
 
@@ -64,20 +64,7 @@ public class TagSyncConfig extends Configuration {
 	private static final String TAGSYNC_TAGADMIN_PASSWORD_PROP = "ranger.tagsync.tagadmin.password";
 	private static final String DEFAULT_TAGADMIN_USERNAME = "rangertagsync";
 
-	private static volatile TagSyncConfig instance = null;
-
 	public static TagSyncConfig getInstance() {
-	/*
-		TagSyncConfig ret = instance;
-		if (ret == null) {
-			synchronized(TagSyncConfig.class) {
-				if (ret == null) {
-					ret = instance = new TagSyncConfig();
-					LOG.debug("TagSyncConfig = {" + ret + "}");
-				}
-			}
-		}
-	*/
 		TagSyncConfig newConfig = new TagSyncConfig();
 		return newConfig;
 	}
@@ -179,8 +166,8 @@ public class TagSyncConfig extends Configuration {
 		return val;
 	}
 
-	static public long getSleepTimeInMillisBetweenCycle(Properties prop) {
-		String val = prop.getProperty(TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP);
+	static public long getTagSourceFileModTimeCheckIntervalInMillis(Properties prop) {
+		String val = prop.getProperty(TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP);
 		return Long.valueOf(val);
 	}
 
@@ -194,6 +181,10 @@ public class TagSyncConfig extends Configuration {
 			return val;
 	}
 
+	static public String getTagSource(Properties prop) {
+		return prop.getProperty(TAGSYNC_SOURCE_CLASS_PROP);
+	}
+
 	static public String getTagSinkClassName(Properties prop) {
 		String val = prop.getProperty(TAGSYNC_SINK_CLASS_PROP);
 		if (StringUtils.equalsIgnoreCase(val, "tagadmin")) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 0235581..7bae973 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -20,6 +20,7 @@
 package org.apache.ranger.tagsync.process;
 
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
@@ -27,165 +28,163 @@ import org.apache.ranger.tagsync.model.TagSource;
 import java.util.Map;
 import java.util.Properties;
 
-public class TagSynchronizer implements Runnable {
+public class TagSynchronizer {
 
 	private static final Logger LOG = Logger.getLogger(TagSynchronizer.class);
 
-	private final static int MAX_INIT_RETRIES = 5;
-
 	private boolean shutdownFlag = false;
-	private TagSink tagSink = null;
 	private TagSource tagSource = null;
 	private Properties properties = null;
 
-
 	public static void main(String[] args) {
 
-		TagSyncConfig config = TagSyncConfig.getInstance();
-		Properties props = config.getProperties();
+		boolean tagSynchronizerInitialized = false;
+		TagSynchronizer tagSynchronizer = new TagSynchronizer();
+
+		while (!tagSynchronizerInitialized) {
+
+			TagSyncConfig config = TagSyncConfig.getInstance();
+			Properties props = config.getProperties();
+
+			tagSynchronizer.setProperties(props);
+			tagSynchronizerInitialized = tagSynchronizer.initialize();
 
-		TagSynchronizer tagSynchronizer = new TagSynchronizer(props);
+			if (!tagSynchronizerInitialized) {
+				LOG.error("TagSynchronizer failed to initialize correctly");
+
+				try {
+					LOG.error("Sleeping for [60] seconds before attempting to re-read configuration XML files");
+					Thread.sleep(60 * 1000);
+				} catch (InterruptedException e) {
+					LOG.error("Failed to wait for [60] seconds", e);
+				}
+			}
+		}
 
 		tagSynchronizer.run();
 	}
 
+	public TagSynchronizer() {
+		setProperties(null);
+	}
+
 	public TagSynchronizer(Properties properties) {
+		setProperties(properties);
+	}
+
+	public void setProperties(Properties properties) {
 		if (properties == null || MapUtils.isEmpty(properties)) {
-			LOG.error("TagSynchronizer initialized with null properties!");
 			this.properties = new Properties();
 		} else {
 			this.properties = properties;
 		}
 	}
 
-	public TagSink getTagSink() {
-		return tagSink;
-	}
-
-	public TagSource getTagSource() {
-		return tagSource;
-	}
+	public boolean initialize() {
 
-	@Override
-	public void run() {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagSynchronizer.run()");
+			LOG.debug("==> TagSynchronizer.initialize()");
 		}
-		try {
-			long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
-
-			boolean initDone = initLoop();
 
-			if (initDone) {
+		printConfigurationProperties();
 
-				Thread tagSourceThread = tagSource.start();
+		boolean ret = true;
 
-				if (tagSourceThread != null) {
-					while (!shutdownFlag) {
-						try {
-							LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
-							Thread.sleep(sleepTimeBetweenCycleInMillis);
-						} catch (InterruptedException e) {
-							LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to synchronize tag information", e);
-						}
-					}
-					if (shutdownFlag) {
-						LOG.info("Interrupting tagSourceThread...");
-						tagSourceThread.interrupt();
-						try {
-							tagSourceThread.join();
-						} catch (InterruptedException interruptedException) {
-							LOG.error("tagSourceThread.join() was interrupted");
-						}
-					}
-				} else {
-					LOG.error("Could not start tagSource monitoring thread");
-				}
-			} else {
-				LOG.error("Failed to initialize TagSynchonizer after " + MAX_INIT_RETRIES + " retries. Exiting thread");
-			}
+		String tagSourceName = TagSyncConfig.getTagSource(properties);
 
-		} catch (Throwable t) {
-			LOG.error("tag-sync thread got an error", t);
-		} finally {
-			LOG.error("Shutting down the tag-sync thread");
+		if (StringUtils.isBlank(tagSourceName) ||
+				(!StringUtils.equalsIgnoreCase(tagSourceName, "file") && !StringUtils.equalsIgnoreCase(tagSourceName, "atlas"))) {
+			ret = false;
+			LOG.error("'ranger.tagsync.source.impl.class' value is invalid!, 'ranger.tagsync.source.impl.class'=" + tagSourceName + ". Supported 'ranger.tagsync.source.impl.class' values are : file, atlas");
 		}
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagSynchronizer.run()");
-		}
-	}
+		if (ret) {
 
-	public boolean initLoop() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagSynchronizer.initLoop()");
-		}
-		boolean ret = false;
+			try {
+				LOG.info("Initializing TAG source and sink");
+				// Initialize tagSink and tagSource
+				String tagSourceClassName = TagSyncConfig.getTagSourceClassName(properties);
+				String tagSinkClassName = TagSyncConfig.getTagSinkClassName(properties);
 
-		long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("tagSourceClassName=" + tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName);
+				}
 
-		for (int initRetries = 0; initRetries < MAX_INIT_RETRIES && !ret; initRetries++) {
+				@SuppressWarnings("unchecked")
+				Class<TagSource> tagSourceClass = (Class<TagSource>) Class.forName(tagSourceClassName);
 
-			printConfigurationProperties();
+				@SuppressWarnings("unchecked")
+				Class<TagSink> tagSinkClass = (Class<TagSink>) Class.forName(tagSinkClassName);
 
-			ret = init();
+				TagSink tagSink = tagSinkClass.newInstance();
+				tagSource = tagSourceClass.newInstance();
 
-			if (!ret) {
-				LOG.error("Failed to initialize TAG source/sink. Will retry after " + sleepTimeBetweenCycleInMillis + " milliseconds.");
-				try {
-					LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
-					Thread.sleep(sleepTimeBetweenCycleInMillis);
-					properties = TagSyncConfig.getInstance().getProperties();
-				} catch (Exception e) {
-					LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to initialize tag source/sink", e);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Created instance of " + tagSourceClassName + ", " + tagSinkClassName);
 				}
+
+				ret = tagSink.initialize(properties) && tagSource.initialize(properties);
+
+				if (ret) {
+					tagSource.setTagSink(tagSink);
+				}
+
+				LOG.info("Done initializing TAG source and sink");
+			} catch (Throwable t) {
+				LOG.error("Failed to initialize TAG source/sink. Error details: ", t);
+				ret = false;
 			}
 		}
+
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagSynchronizer.initLoop()");
+			LOG.debug("<== TagSynchronizer.initialize(), result=" + ret);
 		}
+
 		return ret;
 	}
 
-	public boolean init() {
-
+	public void run() {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagSynchronizer.init()");
+			LOG.debug("==> TagSynchronizer.run()");
 		}
-		boolean ret = false;
-		try {
-			LOG.info("Initializing TAG source and sink");
-			// Initialize tagSink and tagSource
-			String tagSourceClassName = TagSyncConfig.getTagSourceClassName(properties);
-			String tagSinkClassName = TagSyncConfig.getTagSinkClassName(properties);
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("tagSourceClassName=" + tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName);
-			}
 
-			Class<TagSource> tagSourceClass = (Class<TagSource>) Class.forName(tagSourceClassName);
-			Class<TagSink> tagSinkClass = (Class<TagSink>) Class.forName(tagSinkClassName);
+		long shutdownCheckIntervalInMs = 60*1000;
 
-			tagSink = tagSinkClass.newInstance();
-			tagSource = tagSourceClass.newInstance();
+		Thread tagSourceThread = null;
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Created instance of " + tagSourceClassName + ", " + tagSinkClassName);
+		try {
+			tagSourceThread = tagSource.start();
+
+			if (tagSourceThread != null) {
+				while (!shutdownFlag) {
+					try {
+						LOG.debug("Sleeping for [" + shutdownCheckIntervalInMs + "] milliSeconds");
+						Thread.sleep(shutdownCheckIntervalInMs);
+					} catch (InterruptedException e) {
+						LOG.error("Failed to wait for [" + shutdownCheckIntervalInMs + "] milliseconds before attempting to synchronize tag information", e);
+					}
+				}
 			}
-
-			ret = tagSink.initialize(properties) && tagSource.initialize(properties);
-
-			tagSource.setTagSink(tagSink);
-
-			LOG.info("Done initializing TAG source and sink");
 		} catch (Throwable t) {
-			LOG.error("Failed to initialize TAG source/sink. Error details: ", t);
+			LOG.error("tag-sync thread got an error", t);
+		} finally {
+			if (tagSourceThread != null) {
+				LOG.error("Shutting down the tag-sync thread");
+				LOG.info("Interrupting tagSourceThread...");
+				tagSourceThread.interrupt();
+				try {
+					tagSourceThread.join();
+				} catch (InterruptedException interruptedException) {
+					LOG.error("tagSourceThread.join() was interrupted");
+				}
+			} else {
+				LOG.error("Could not start tagSource monitoring thread");
+			}
 		}
+
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagSynchronizer.init(), result=" + ret);
+			LOG.debug("<== TagSynchronizer.run()");
 		}
-
-		return ret;
 	}
 
 	public void shutdown(String reason) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
index 76bb62d..41085d0 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
@@ -28,15 +28,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.admin.client.datatype.RESTResponse;
 import org.apache.ranger.tagsync.model.TagSink;
-import org.apache.ranger.plugin.model.*;
-import org.apache.ranger.plugin.store.PList;
-import org.apache.ranger.plugin.store.ServiceStore;
 import org.apache.ranger.plugin.util.RangerRESTClient;
 import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServiceTags;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -47,27 +43,18 @@ public class TagRESTSink implements TagSink {
 	private static final String MODULE_PREFIX = "/tags";
 
 	private static final String REST_MIME_TYPE_JSON = "application/json" ;
-	private static final String REST_URL_TAGDEFS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tagdefs/" ;
-	private static final String REST_URL_TAGDEF_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tagdef/" ;
-	private static final String REST_URL_SERVICERESOURCES_RESOURCE = REST_PREFIX + MODULE_PREFIX + "resources/" ;
-	private static final String REST_URL_SERVICERESOURCE_RESOURCE = REST_PREFIX + MODULE_PREFIX + "resource/" ;
-	private static final String REST_URL_TAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tags/" ;
-	private static final String REST_URL_TAG_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tag/" ;
-	private static final String REST_URL_TAGRESOURCEMAP_IDS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tagresourcemapids/";
+
 	private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
 
 	private RangerRESTClient tagRESTClient = null;
 
 	@Override
-	public void init() {}
-
-	@Override
 	public boolean initialize(Properties properties) {
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("==> TagRESTSink.initialize()");
 		}
 
-		boolean ret = false;
+		boolean ret = true;
 
 		String restUrl       = TagSyncConfig.getTagAdminRESTUrl(properties);
 		String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
@@ -78,340 +65,59 @@ public class TagRESTSink implements TagSink {
 			LOG.debug("restUrl=" + restUrl);
 			LOG.debug("sslConfigFile=" + sslConfigFile);
 			LOG.debug("userName=" + userName);
-			LOG.debug("password=" + password);
-		}
-		tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
-		if (tagRESTClient != null) {
-			tagRESTClient.setBasicAuthInfo(userName, password);
-			ret = true;
-		} else {
-			LOG.error("Could not create RangerRESTClient");
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== TagRESTSink.initialize(), result=" + ret);
-		}
-		return ret;
-	}
-
-	@Override
-	public void setServiceStore(ServiceStore svcStore) {
-
-	}
-
-	@Override
-	public RangerTagDef createTagDef(RangerTagDef tagDef) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> createTagDef(" + tagDef + ")");
-		}
-
-		RangerTagDef ret = null;
-
-		WebResource webResource = createWebResource(REST_URL_TAGDEFS_RESOURCE);
-		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class, tagRESTClient.toJson(tagDef));
-
-		if(response != null && response.getStatus() == 200) {
-			ret = response.getEntity(RangerTagDef.class);
-		} else {
-			LOG.error("RangerAdmin REST call returned with response={" + response +"}");
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-			throw new Exception(resp.getMessage());
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== createTagDef(" + tagDef + "): " + ret);
-		}
-
-		return ret;
-	}
-
-	@Override
-	public RangerTagDef updateTagDef(RangerTagDef TagDef) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public void deleteTagDefByName(String name) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public void deleteTagDef(Long id) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> deleteTagDef(" + id  + ")");
-		}
-		WebResource webResource = createWebResource(REST_URL_TAGDEF_RESOURCE + Long.toString(id));
-
-		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
-
-		if(response != null && response.getStatus() == 204) {
-		} else {
-			LOG.error("RangerAdmin REST call returned with response={" + response + "}");
-
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-			throw new Exception(resp.getMessage());
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== deleteTagDef(" + id + ")");
-		}
-	}
-
-	@Override
-	public RangerTagDef getTagDef(Long id) throws Exception {
-		throw new Exception("Not implemented");
-
-	}
-
-	@Override
-	public RangerTagDef getTagDefByGuid(String guid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public RangerTagDef getTagDefByName(String name) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTagDef> getTagDefs(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public PList<RangerTagDef> getPaginatedTagDefs(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<String> getTagTypes() throws Exception {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-
-	@Override
-	public RangerTag createTag(RangerTag tag) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> createTag(" + tag + ")");
-		}
-
-		RangerTag ret = null;
-
-		WebResource webResource = createWebResource(REST_URL_TAGS_RESOURCE);
-		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class, tagRESTClient.toJson(tag));
-
-		if(response != null && response.getStatus() == 200) {
-			ret = response.getEntity(RangerTag.class);
-		} else {
-			LOG.error("RangerAdmin REST call returned with response={" + response +"}");
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-			throw new Exception(resp.getMessage());
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== createTag(" + tag + "): " + ret);
 		}
 
-		return ret;
-	}
-
-	@Override
-	public RangerTag updateTag(RangerTag tag) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public void deleteTag(Long id) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> deleteTag(" + id  + ")");
-		}
-		WebResource webResource = createWebResource(REST_URL_TAG_RESOURCE + Long.toString(id));
-
-		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
-
-		if(response != null && response.getStatus() == 204) {
+		if (StringUtils.isBlank(restUrl)) {
+			ret = false;
+			LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!");
 		} else {
-			LOG.error("RangerAdmin REST call returned with response={" + response + "}");
-
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-			throw new Exception(resp.getMessage());
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== deleteTag(" + id + ")");
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("ranger.tagsync.tagadmin.rest.url:" + restUrl);
+			}
 		}
-	}
-
-	@Override
-	public RangerTag getTag(Long id) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public RangerTag getTagByGuid(String guid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTag> getTagsByType(String name) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<Long> getTagIdsForResourceId(Long resourceId) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTag> getTagsForResourceId(Long resourceId) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTag> getTagsForResourceGuid(String resourceGuid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTag> getTags(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public PList<RangerTag> getPaginatedTags(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
 
+		if (ret) {
+			tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
+			tagRESTClient.setBasicAuthInfo(userName, password);
 
-	@Override
-	public RangerServiceResource createServiceResource(RangerServiceResource resource) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> createServiceResource(" + resource + ")");
+			ret = testConnection();
 		}
 
-		RangerServiceResource ret = null;
-
-		WebResource webResource = createWebResource(REST_URL_SERVICERESOURCES_RESOURCE);
-		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class, tagRESTClient.toJson(resource));
-
-		if(response != null && response.getStatus() == 200) {
-			ret = response.getEntity(RangerServiceResource.class);
-		} else {
-			LOG.error("RangerAdmin REST call returned with response={" + response +"}");
-
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-			throw new Exception(resp.getMessage());
+		if (!ret) {
+			LOG.error("Cannot connect to Tag Admin. Please recheck configuration properties and/or check if Tag Admin is running and responsive");
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== createServiceResource(" + resource + "): " + ret);
+			LOG.debug("<== TagRESTSink.initialize(), result=" + ret);
 		}
 
 		return ret;
 	}
 
-	@Override
-	public RangerServiceResource updateServiceResource(RangerServiceResource resource) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public void deleteServiceResource(Long id) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> deleteServiceResource(" + id  + ")");
-		}
-		WebResource webResource = createWebResource(REST_URL_SERVICERESOURCE_RESOURCE + Long.toString(id));
-
-		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
-
-		if(response != null && response.getStatus() == 204) {
-		} else {
-			LOG.error("RangerAdmin REST call returned with response={" + response + "}");
-
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-			throw new Exception(resp.getMessage());
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== deleteServiceResource(" + id + ")");
-		}
-	}
-
-	@Override
-	public RangerServiceResource getServiceResource(Long id) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public RangerServiceResource getServiceResourceByGuid(String guid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerServiceResource> getServiceResourcesByService(String serviceName) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public RangerServiceResource getServiceResourceByResourceSignature(String resourceSignature) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerServiceResource> getServiceResources(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public PList<RangerServiceResource> getPaginatedServiceResources(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-
-	@Override
-	public RangerTagResourceMap createTagResourceMap(RangerTagResourceMap tagResourceMap) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> createTagResourceMap(" + tagResourceMap + ")");
+	public boolean testConnection() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagRESTSink.testConnection()");
 		}
 
-		RangerTagResourceMap ret = null;
-
-		WebResource webResource = createWebResource(REST_URL_TAGRESOURCEMAP_IDS_RESOURCE)
-				.queryParam("tag-id", Long.toString(tagResourceMap.getTagId()))
-				.queryParam("resource-id", Long.toString(tagResourceMap.getResourceId()));
-
-		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class);
+		boolean ret = true;
 
-		if(response != null && response.getStatus() == 200) {
-			ret = response.getEntity(RangerTagResourceMap.class);
-		} else {
-			LOG.error("RangerAdmin REST call returned with response={" + response +"}");
-
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-			throw new Exception(resp.getMessage());
+		try {
+			// build a dummy serviceTags structure and upload it to test connectivity
+			ServiceTags serviceTags = new ServiceTags();
+			serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+			uploadServiceTags(serviceTags);
+		} catch (Exception exception) {
+			LOG.error("test-upload of serviceTags failed.", exception);
+			ret = false;
 		}
 
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== createTagResourceMap(" + tagResourceMap + "): " + ret);
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagRESTSink.testConnection(), result=" + ret);
 		}
-
 		return ret;
 	}
 
 	@Override
-	public void deleteTagResourceMap(Long id) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
 	public void uploadServiceTags(ServiceTags serviceTags) throws Exception {
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("==> uploadServiceTags()");
@@ -420,8 +126,7 @@ public class TagRESTSink implements TagSink {
 
 		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class, tagRESTClient.toJson(serviceTags));
 
-		if(response != null && response.getStatus() == 204) {
-		} else {
+		if(response == null || response.getStatus() != 204) {
 			LOG.error("RangerAdmin REST call returned with response={" + response + "}");
 
 			RESTResponse resp = RESTResponse.fromClientResponse(response);
@@ -434,63 +139,6 @@ public class TagRESTSink implements TagSink {
 		}
 	}
 
-	@Override
-	public RangerTagResourceMap getTagResourceMap(Long id) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public RangerTagResourceMap getTagResourceMapByGuid(String guid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTagResourceMap> getTagResourceMapsForTagId(Long tagId) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTagResourceMap> getTagResourceMapsForTagGuid(String tagGuid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTagResourceMap> getTagResourceMapsForResourceId(Long resourceId) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTagResourceMap> getTagResourceMapsForResourceGuid(String resourceGuid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public RangerTagResourceMap getTagResourceMapForTagAndResourceId(Long tagId, Long resourceId) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-
-	@Override
-	public RangerTagResourceMap getTagResourceMapForTagAndResourceGuid(String tagGuid, String resourceGuid) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public List<RangerTagResourceMap> getTagResourceMaps(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-	@Override
-	public PList<RangerTagResourceMap> getPaginatedTagResourceMaps(SearchFilter filter) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
-
-	@Override
-	public ServiceTags getServiceTagsIfUpdated(String serviceName, Long lastKnownVersion) throws Exception {
-		throw new Exception("Not implemented");
-	}
-
 	private WebResource createWebResource(String url) {
 		return createWebResource(url, null);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
new file mode 100644
index 0000000..2548c36
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.typesystem.EntityImpl;
+import org.apache.atlas.typesystem.IdImpl;
+import org.apache.atlas.typesystem.TraitImpl;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.atlas.typesystem.api.Entity;
+import org.apache.atlas.typesystem.api.Trait;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.plugin.util.RangerRESTUtils;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.*;
+
+
+// class AtlasUtil
+
+@SuppressWarnings("unchecked")
+public class AtlasUtility {
+
+	private static final Log LOG = LogFactory.getLog(AtlasUtility.class);
+
+	// Atlas APIs
+
+	public static final String API_ATLAS_TYPES = "api/atlas/types";
+	public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
+	public static final String API_ATLAS_ENTITY = "api/atlas/entities/";
+	public static final String API_ATLAS_TYPE = "api/atlas/types/";
+
+	public static final String RESULTS_ATTRIBUTE = "results";
+	public static final String DEFINITION_ATTRIBUTE = "definition";
+	public static final String VALUES_ATTRIBUTE = "values";
+	public static final String TRAITS_ATTRIBUTE = "traits";
+	public static final String TYPE_NAME_ATTRIBUTE = "typeName";
+	public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes";
+	public static final String SUPER_TYPES_ATTRIBUTE = "superTypes";
+	public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
+	public static final String NAME_ATTRIBUTE = "name";
+
+	private Type mapType = new TypeToken<Map<String, Object>>() {
+	}.getType();
+
+	private RangerRESTClient restClient;
+	private Map<String, Entity> entities = new LinkedHashMap<>();
+
+
+	// ----- Constructor ------------------------------------------------------
+
+	public AtlasUtility(Properties properties) {
+
+		String url = TagSyncConfig.getAtlasEndpoint(properties);
+		String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
+		}
+
+		restClient = new RangerRESTClient(url, sslConfigFileName);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
+		}
+	}
+
+	// update the set of entities with current from Atlas
+	public void refreshAllEntities() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagAtlasSource.refreshAllEntities()");
+		}
+
+		try {
+			entities.clear();
+			entities.putAll(getAllEntities());
+		} catch (IOException e) {
+			LOG.error("getAllEntities() failed", e);
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagAtlasSource.refreshAllEntities()");
+		}
+	}
+
+	// ----- AtlasUtility ------------------------------------------------------
+
+	public Map<String, Entity> getAllEntities() throws IOException {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagAtlasSource.getAllEntities()");
+		}
+		Map<String, Entity> entities = new LinkedHashMap<>();
+
+		Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
+
+		List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
+
+		for (String type : types) {
+
+			Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
+
+			List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
+
+			for (String guid : guids) {
+
+				if (StringUtils.isNotBlank(guid)) {
+
+					Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>();
+
+					Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
+
+					if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+						String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
+
+						LOG.info("{");
+						LOG.info("	\"entity-id\":" + guid + ",");
+						LOG.info("	\"entity-definition\":" + definitionJSON);
+						LOG.info("}");
+
+						Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
+
+						Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
+						Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+						String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
+
+						LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")");
+
+
+						Map<String, TraitImpl> traitMap = new HashMap<>();
+
+						if (MapUtils.isNotEmpty(traits)) {
+
+							LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
+
+							for (Map.Entry<String, Object> entry : traits.entrySet()) {
+
+								Map<String, Object> trait = (Map<String, Object>) entry.getValue();
+
+								Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+								String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+
+								Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+
+								TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
+
+								traitSuperTypes.put(trait1, superTypes);
+
+								traitMap.put(entry.getKey(), trait1);
+
+
+								LOG.info("			Trait(typeName=" + traitTypeName + ")");
+
+							}
+						} else {
+							LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")");
+						}
+						EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
+
+						showEntity(entity);
+
+						entities.put(guid, entity);
+
+					}
+				}
+			}
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagAtlasSource.getAllEntities()");
+		}
+		return entities;
+	}
+
+
+	// ----- helper methods ----------------------------------------------------
+
+	private Map<String, Object> getTraitType(String traitName)
+			throws IOException {
+
+		Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
+
+		if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+			String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
+
+			Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
+
+			List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
+
+			if (traitTypes.size() > 0) {
+				return (Map<String, Object>) traitTypes.get(0);
+			}
+		}
+		return null;
+	}
+
+	private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values)
+			throws IOException {
+
+		Map<String, TraitImpl> superTypes = new HashMap<>();
+
+		if (traitType != null) {
+
+			List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
+
+			for (String superTypeName : superTypeNames) {
+
+				Map<String, Object> superTraitType = getTraitType(superTypeName);
+
+				if (superTraitType != null) {
+					List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+
+					Map<String, Object> superTypeValues = new HashMap<>();
+					for (Map<String, Object> attributeDefinition : attributeDefinitions) {
+
+						String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
+						if (values.containsKey(attributeName)) {
+							superTypeValues.put(attributeName, values.get(attributeName));
+						}
+					}
+
+					superTypes.put(superTypeName,
+							//new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName));
+							new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues)));
+				}
+			}
+		}
+		return superTypes;
+	}
+
+
+	/*
+		private Map<String, Object> atlasAPI(String endpoint) throws IOException {
+			InputStream in = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
+			return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType);
+		}
+		*/
+
+
+	private Map<String, Object> atlasAPI(String endpoint) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint + ")");
+		}
+		// Create a REST client and perform a get on it
+		Map<String, Object> ret = new HashMap<String, Object>();
+
+		WebResource webResource = restClient.getResource(endpoint);
+
+		ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
+
+		if (response != null && response.getStatus() == 200) {
+			ret = response.getEntity(ret.getClass());
+		} else {
+			LOG.error("Atlas REST call returned with response={" + response + "}");
+
+			RESTResponse resp = RESTResponse.fromClientResponse(response);
+			LOG.error("Error getting Atlas Entity. request=" + webResource.toString()
+					+ ", response=" + resp.toString());
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")");
+		}
+		return ret;
+	}
+
+	private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
+		return type.cast(map.get(name));
+	}
+
+	public void showEntity(Entity entity) {
+
+		LOG.debug("Entity-id	:" + entity.getId());
+
+		LOG.debug("Type:		" + entity.getTypeName());
+
+		LOG.debug("----- Values -----");
+
+		for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
+			LOG.debug("		Name:	" + entry.getKey() + "");
+			Object value = entry.getValue();
+			LOG.debug("		Value:	" + getValue(value, entities.keySet()));
+		}
+
+		LOG.debug("----- Traits -----");
+
+		for (String traitName : entity.getTraits().keySet()) {
+			LOG.debug("		Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName);
+		}
+
+	}
+
+	public void showTrait(Entity entity, String traitId) {
+
+		String[] traitNames = traitId.split(",");
+
+		Trait trait = entity.getTraits().get(traitNames[0]);
+
+		for (int i = 1; i < traitNames.length; ++i) {
+			trait = trait.getSuperTypes().get(traitNames[i]);
+		}
+
+		String typeName = trait.getTypeName();
+
+		LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId());
+
+		LOG.debug("Type: " + typeName);
+
+		LOG.debug("----- Values ------");
+
+		for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) {
+			LOG.debug("Name:" + entry.getKey());
+			Object value = entry.getValue();
+			LOG.debug("Value:" + getValue(value, entities.keySet()));
+		}
+
+		LOG.debug("Super Traits");
+
+
+		for (String traitName : trait.getSuperTypes().keySet()) {
+			LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName);
+		}
+	}
+
+	// resolve the given value if necessary
+	private String getValue(Object value, Set<String> ids) {
+		if (value == null) {
+			return "";
+		}
+		String idString = getIdValue(value, ids);
+		if (idString != null) {
+			return idString;
+		}
+
+		idString = getIdListValue(value, ids);
+		if (idString != null) {
+			return idString;
+		}
+
+		return value.toString();
+	}
+
+	// get an id from the given value; return null if the value is not an id type
+	private String getIdValue(Object value, Set<String> ids) {
+		if (value instanceof Map) {
+			Map map = (Map) value;
+			if (map.size() == 3 && map.containsKey("id")) {
+				String id = map.get("id").toString();
+				if (ids.contains(id)) {
+					return id;
+				}
+			}
+		}
+		return null;
+	}
+
+	// get an id list from the given value; return null if the value is not an id list type
+	private String getIdListValue(Object value, Set<String> ids) {
+		if (value instanceof List) {
+			List list = (List) value;
+			if (list.size() > 0) {
+				StringBuilder sb = new StringBuilder();
+				for (Object o : list) {
+					String idString = getIdValue(o, ids);
+					if (idString == null) {
+						return value.toString();
+					}
+					if (sb.length() > 0) {
+						sb.append(", ");
+					}
+					sb.append(idString);
+				}
+				return sb.toString();
+			}
+		}
+		return null;
+	}
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
index 1c2a25b..e5c91bd 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
@@ -20,16 +20,10 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.EntityImpl;
-import org.apache.atlas.typesystem.IdImpl;
-import org.apache.atlas.typesystem.TraitImpl;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -40,25 +34,23 @@ import org.apache.atlas.notification.entity.EntityNotificationConsumer;
 import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
 import org.apache.atlas.typesystem.api.Entity;
 import org.apache.atlas.typesystem.api.Trait;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.plugin.util.RangerRESTUtils;
 import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
 
 import java.io.IOException;
-import java.lang.reflect.Type;
+import java.io.InputStream;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 public class TagAtlasSource implements TagSource {
 	private static final Log LOG = LogFactory.getLog(TagAtlasSource.class);
 
+	public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties";
+
+	public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.notification.kafka.bootstrap.servers";
+	public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.notification.kafka.zookeeper.connect";
+	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.notification.kafka.group.id";
 
-	private final Map<String, Entity> entities = new LinkedHashMap<>();
 	private TagSink tagSink;
 	private Properties properties;
 	private ConsumerRunnable consumerTask;
@@ -78,19 +70,46 @@ public class TagAtlasSource implements TagSource {
 			this.properties = properties;
 		}
 
+		Properties atlasProperties = new Properties();
 
-		NotificationModule notificationModule = new NotificationModule();
+		InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
 
-		Injector injector = Guice.createInjector(notificationModule);
+		if (inputStream != null) {
+			try {
+				atlasProperties.load(inputStream);
+			} catch (IOException ioException) {
+				ret = false;
+				LOG.error("Cannot load Atlas application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
+			}
+		} else {
+			ret = false;
+			LOG.error("Cannot find Atlas application properties file");
+		}
 
-		EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class);
+		if (ret) {
+			if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS))) {
+				ret = false;
+				LOG.error("Value of property '" + TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
+			}
+			if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT))) {
+				ret = false;
+				LOG.error("Value of property '" + TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
+			}
+			if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP))) {
+				ret = false;
+				LOG.error("Value of property '" + TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
+			}
+		}
 
-		consumerTask = new ConsumerRunnable(consumerProvider.get());
+		if (ret) {
+			NotificationModule notificationModule = new NotificationModule();
 
-		//ExecutorService executorService = Executors.newFixedThreadPool(1);
+			Injector injector = Guice.createInjector(notificationModule);
 
-		//executorService.submit(new ConsumerRunnable(consumerProvider.get()));
+			EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class);
 
+			consumerTask = new ConsumerRunnable(consumerProvider.get());
+		}
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("<== TagAtlasSource.initialize(), result=" + ret);
@@ -202,390 +221,7 @@ public class TagAtlasSource implements TagSource {
 						LOG.debug("				Trait-Value:" + valueEntry.getValue());
 					}
 				}
-
-			}
-		}
-
-	}
-
-	public void printAllEntities() {
-		try {
-			new AtlasUtility().getAllEntities();
-		}
-		catch(java.io.IOException ioException) {
-			LOG.error("Caught IOException while retrieving Atlas Entities:", ioException);
-		}
-	}
-
-	// update the set of entities with current from Atlas
-	public void refreshAllEntities() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagAtlasSource.refreshAllEntities()");
-		}
-		AtlasUtility atlasUtility = new AtlasUtility();
-
-		try {
-			entities.putAll(atlasUtility.getAllEntities());
-		} catch (IOException e) {
-			LOG.error("getAllEntities() failed", e);
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagAtlasSource.refreshAllEntities()");
-		}
-	}
-
-	// Inner class AtlasUtil
-
-	/**
-	 * Atlas utility.
-	 */
-	@SuppressWarnings("unchecked")
-	private class AtlasUtility {
-
-		/**
-		 * Atlas APIs
-		 */
-		public static final String API_ATLAS_TYPES    = "api/atlas/types";
-		public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
-		public static final String API_ATLAS_ENTITY   = "api/atlas/entities/";
-		public static final String API_ATLAS_TYPE     = "api/atlas/types/";
-
-		/**
-		 * API Response Attributes
-		 */
-		public static final String RESULTS_ATTRIBUTE               = "results";
-		public static final String DEFINITION_ATTRIBUTE            = "definition";
-		public static final String VALUES_ATTRIBUTE                = "values";
-		public static final String TRAITS_ATTRIBUTE                = "traits";
-		public static final String TYPE_NAME_ATTRIBUTE             = "typeName";
-		public static final String TRAIT_TYPES_ATTRIBUTE           = "traitTypes";
-		public static final String SUPER_TYPES_ATTRIBUTE           = "superTypes";
-		public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
-		public static final String NAME_ATTRIBUTE                  = "name";
-
-		private Type mapType = new TypeToken<Map<String, Object>>(){}.getType();
-
-		private RangerRESTClient restClient;
-
-
-		// ----- Constructors ------------------------------------------------------
-
-		/**
-		 * Construct an AtlasUtility
-		 *
-		 */
-		public AtlasUtility() {
-
-			String url               = TagSyncConfig.getAtlasEndpoint(properties);
-			String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties);
-
-
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
-			}
-
-			restClient = new RangerRESTClient(url, sslConfigFileName);
-
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
-			}
-		}
-
-
-		// ----- AtlasUtility ------------------------------------------------------
-
-		/**
-		 * Get all of the entities defined in Atlas.
-		 *
-		 * @return  a mapping of GUIDs to Atlas entities
-		 *
-		 * @throws IOException if there is an error communicating with Atlas
-		 */
-		public Map<String, Entity> getAllEntities() throws IOException {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("==> TagAtlasSource.getAllEntities()");
-			}
-			Map<String, Entity> entities = new LinkedHashMap<>();
-
-			Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
-
-			List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
-
-			for (String type : types) {
-
-				Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
-
-				List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
-				for (String guid : guids) {
-
-					if (StringUtils.isNotBlank(guid)) {
-
-						Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>();
-
-						Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
-
-						if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-							String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
-
-							LOG.info("{");
-							LOG.info("	\"entity-id\":" + guid + ",");
-							LOG.info("	\"entity-definition\":" + definitionJSON);
-							LOG.info("}");
-
-							Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
-
-							Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
-							Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-							String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
-
-							LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")");
-
-
-							Map<String, TraitImpl> traitMap = new HashMap<>();
-
-							if (MapUtils.isNotEmpty(traits)) {
-
-								LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
-
-								for (Map.Entry<String, Object> entry : traits.entrySet()) {
-
-									Map<String, Object> trait = (Map<String, Object>) entry.getValue();
-
-									Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-									String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-									Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
-									TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
-
-									traitSuperTypes.put(trait1, superTypes);
-
-									traitMap.put(entry.getKey(), trait1);
-
-
-									LOG.info("			Trait(typeName=" + traitTypeName + ")");
-
-								}
-							} else {
-								LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")");
-							}
-							EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
-
-							showEntity(entity);
-
-							entities.put(guid, entity);
-
-						}
-					}
-				}
-			}
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("==> TagAtlasSource.getAllEntities()");
-			}
-			return entities;
-		}
-
-
-		// ----- helper methods ----------------------------------------------------
-
-		private Map<String, Object> getTraitType(String traitName)
-				throws IOException {
-
-			Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
-
-			if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-				String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
-
-				Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
-
-				List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
-
-				if (traitTypes.size() > 0) {
-					return (Map<String, Object>) traitTypes.get(0);
-				}
-			}
-			return null;
-		}
-
-		private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values)
-				throws IOException {
-
-			Map<String, TraitImpl> superTypes = new HashMap<>();
-
-			if (traitType != null) {
-
-				List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
-
-				for (String superTypeName : superTypeNames) {
-
-					Map<String, Object> superTraitType = getTraitType(superTypeName);
-
-					if (superTraitType != null) {
-						List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-						Map<String, Object> superTypeValues = new HashMap<>();
-						for (Map<String, Object> attributeDefinition : attributeDefinitions) {
-
-							String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
-							if (values.containsKey(attributeName)) {
-								superTypeValues.put(attributeName, values.get(attributeName));
-							}
-						}
-
-						superTypes.put(superTypeName,
-								//new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName));
-								new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues)));
-					}
-				}
-			}
-			return superTypes;
-		}
-
-		/*
-		private Map<String, Object> atlasAPI(String endpoint) throws IOException {
-			InputStream in  = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
-			return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType);
-		}
-		*/
-
-		private Map<String, Object> atlasAPI(String endpoint)  {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint +")");
-			}
-			// Create a REST client and perform a get on it
-			Map<String, Object> ret = new HashMap<String, Object>();
-
-			WebResource webResource = restClient.getResource(endpoint);
-
-			ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-			if(response != null && response.getStatus() == 200) {
-				ret = response.getEntity(ret.getClass());
-			} else {
-				LOG.error("Atlas REST call returned with response={" + response +"}");
-
-				RESTResponse resp = RESTResponse.fromClientResponse(response);
-				LOG.error("Error getting Atlas Entity. request=" + webResource.toString()
-						+ ", response=" + resp.toString());
 			}
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")");
-			}
-			return ret;
 		}
-
-		private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
-			return type.cast(map.get(name));
-		}
-
-
-
-		public void showEntity(Entity entity) {
-
-			LOG.debug("Entity-id	:" + entity.getId());
-
-			LOG.debug("Type:		" + entity.getTypeName());
-
-			LOG.debug("----- Values -----");
-
-			for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
-				LOG.debug("		Name:	" + entry.getKey() + "");
-				Object value = entry.getValue();
-				LOG.debug("		Value:	" + getValue(value, entities.keySet()));
-			}
-
-			LOG.debug("----- Traits -----");
-
-			for (String traitName : entity.getTraits().keySet()) {
-				LOG.debug("		Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName);
-			}
-
-		}
-
-		public void showTrait(Entity entity, String traitId) {
-
-			String[] traitNames = traitId.split(",");
-
-			Trait trait = entity.getTraits().get(traitNames[0]);
-
-			for (int i = 1; i < traitNames.length; ++i ) {
-				trait = trait.getSuperTypes().get(traitNames[i]);
-			}
-
-			String typeName = trait.getTypeName();
-
-			LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId());
-
-			LOG.debug("Type: " + typeName);
-
-			LOG.debug("----- Values ------");
-
-			for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) {
-				LOG.debug("Name:" + entry.getKey());
-				Object value = entry.getValue();
-				LOG.debug("Value:" + getValue(value, entities.keySet()));
-			}
-
-			LOG.debug("Super Traits");
-
-
-			for (String traitName : trait.getSuperTypes().keySet()) {
-				LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName);
-			}
-		}
-
-		// resolve the given value if necessary
-		private String getValue(Object value, Set<String> ids) {
-			if (value == null) {
-				return "";
-			}
-			String idString = getIdValue(value, ids);
-			if (idString != null) {
-				return idString;
-			}
-
-			idString = getIdListValue(value, ids);
-			if (idString != null) {
-				return idString;
-			}
-
-			return value.toString();
-		}
-		// get an id from the given value; return null if the value is not an id type
-		private String getIdValue(Object value, Set<String> ids) {
-			if (value instanceof Map) {
-				Map map = (Map) value;
-				if (map.size() == 3 && map.containsKey("id")){
-					String id = map.get("id").toString();
-					if (ids.contains(id)) {
-						return id;
-					}
-				}
-			}
-			return null;
-		}
-		// get an id list from the given value; return null if the value is not an id list type
-		private String getIdListValue(Object value, Set<String> ids) {
-			if (value instanceof List) {
-				List list = (List) value;
-				if (list.size() > 0) {
-					StringBuilder sb = new StringBuilder();
-					for (Object o : list) {
-						String idString = getIdValue(o, ids);
-						if (idString == null) {
-							return value.toString();
-						}
-						if (sb.length() > 0) {
-							sb.append(", ");
-						}
-						sb.append(idString);
-					}
-					return sb.toString();
-				}
-			}
-			return null;
-		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
index 925a712..03a3980 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.file;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.tagsync.model.TagSink;
@@ -59,8 +60,24 @@ public class TagFileSource implements TagSource, Runnable {
 
 		boolean ret = true;
 
+		if (StringUtils.isBlank(TagSyncConfig.getTagSourceFileName(properties))) {
+			ret = false;
+			LOG.error("value of property 'ranger.tagsync.source.impl.class' is file and no value specified for property 'ranger.tagsync.filesource.filename'!");
+		}
+
 		if (ret) {
 
+			long fileModTimeCheckIntervalInMs = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
+
+			if (fileModTimeCheckIntervalInMs <= 0L) {
+				LOG.info("'ranger.tagsync.filesource.modtime.check.interval' is zero or negative! 'ranger.tagsync.filesource.modtime.check.interval'=" + fileModTimeCheckIntervalInMs + "ms");
+				LOG.info("Setting 'ranger.tagsync.filesource.modtime.check.interval' to 60 seconds");
+				fileModTimeCheckIntervalInMs = 60*1000;
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + fileModTimeCheckIntervalInMs + "ms");
+				}
+			}
 			sourceFileName = TagSyncConfig.getTagSourceFileName(properties);
 
 			if (LOG.isDebugEnabled()) {
@@ -120,7 +137,7 @@ public class TagFileSource implements TagSource, Runnable {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> TagFileSource.run()");
 		}
-		long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+		long sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
 		boolean shutdownFlag = false;
 
 		while (!shutdownFlag) {
@@ -143,7 +160,7 @@ public class TagFileSource implements TagSource, Runnable {
 				Thread.sleep(sleepTimeBetweenCycleInMillis);
 			}
 			catch (InterruptedException e) {
-				LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to synchronize tag information", e);
+				LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", e);
 				shutdownFlag = true;
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/resources/ranger-tagsync-default.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml
index b9e4512..8649d06 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -27,7 +27,7 @@
 	</property>
 	<property>
 		<name>ranger.tagsync.tagadmin.rest.url</name>
-		<value>http://localhost:6080</value>
+		<value></value>
 		<description></description>
 	</property>
 	<property>
@@ -36,18 +36,18 @@
 		<description></description>
 	</property>
 	<property>
-		<name>ranger.tagsync.sleeptimeinmillisbetweensynccycle</name>
+		<name>ranger.tagsync.filesource.modtime.check.interval</name>
 		<value>60000</value>
 		<description></description>
 	</property>
 	<property>
 		<name>ranger.tagsync.filesource.filename</name>
-		<value>/etc/ranger/data/tags.json</value>
+		<value></value>
 		<description></description>
 	</property>
 	<property>
 		<name>ranger.tagsync.source.impl.class</name>
-		<value>file</value>
+		<value></value>
 		<description></description>
 	</property>
 	<property>
@@ -56,13 +56,7 @@
 		<description></description>
 	</property>
 	<property>
-		<name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name>
-		<value>cl1_hive</value>
-		<description></description>
-	</property>
-	<property>
-		<name>ranger.tagsync.atlassource.endpoint</name>
-		<value>http://localhost:21000/</value>
-		<description></description>
+		<name>ranger.tagsync.atlas.to.service.mapping</name>
+		<value>c1,hive,cl1_hive</value>
 	</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/resources/ranger-tagsync-site.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-site.xml b/tagsync/src/main/resources/ranger-tagsync-site.xml
new file mode 100644
index 0000000..19532e9
--- /dev/null
+++ b/tagsync/src/main/resources/ranger-tagsync-site.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+	<property>
+		<name>ranger.tagsync.enabled</name>
+		<value>true</value>
+	</property>
+	<property>
+		<name>ranger.tagsync.logdir</name>
+		<value>log</value>
+	</property>
+	<property>
+		<name>ranger.tagsync.tagadmin.rest.url</name>
+		<value>localhost:6080</value>
+		<description></description>
+	</property>
+	<property>
+		<name>ranger.tagsync.tagadmin.rest.ssl.config.file</name>
+		<value></value>
+		<description></description>
+	</property>
+	<property>
+		<name>ranger.tagsync.filesource.modtime.check.interval</name>
+		<value>60000</value>
+		<description></description>
+	</property>
+	<property>
+		<name>ranger.tagsync.filesource.filename</name>
+		<value>/etc/ranger/data/tags.json</value>
+		<description></description>
+	</property>
+	<property>
+		<name>ranger.tagsync.source.impl.class</name>
+		<value>File</value>
+		<description></description>
+	</property>
+	<property>
+		<name>ranger.tagsync.atlas.to.service.mapping</name>
+		<value>c1,hive,cl1_hive</value>
+	</property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
index 10be4e6..9d603d4 100644
--- a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
@@ -75,20 +75,27 @@ public class TestTagSynchronizer {
 	@Test
 	public void testTagDownload() {
 
-		boolean initDone = tagSynchronizer.initLoop();
+		boolean initDone = false;
+
+		/* For tagSynchronizer.initialize() to succeed, edit ranger-tagsync-site.xml file to contain correct
+		values of the following properties:
+			ranger.tagsync.tagadmin.rest.url, ranger.tagsync.tagadmin.password
+
+		For example:
+			<property>
+				<name>ranger.tagsync.tagadmin.rest.url</name>
+				<value>http://tagsync-test:6080</value>
+			</property>
+			<property>
+				<name>ranger.tagsync.tagadmin.password</name>
+				<value>rangertagsync</value>
+			</property>
+		*/
 
-		System.out.println("TagSynchronizer initialization result=" + initDone);
 
-		/*
-		TagSource tagSource = tagSynchronizer.getTagSource();
+		//initDone = tagSynchronizer.initialize();
 
-		try {
-			TagAtlasSource tagAtlasSource = (TagAtlasSource) tagSource;
-			//tagAtlasSource.printAllEntities();
-		} catch (ClassCastException exception) {
-			System.err.println("TagSource is not of TagAtlasSource");
-		}
-		*/
+		System.out.println("TagSynchronizer initialization result=" + initDone);
 
 		System.out.println("Exiting testTagDownload()");
 	}


Mime
View raw message