ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/2] incubator-ranger git commit: RANGER-397 Support RDBMS as audit destination using V3 configuration
Date Sat, 30 May 2015 19:19:41 GMT
Repository: incubator-ranger
Updated Branches:
  refs/heads/master 9e5bd8540 -> 94ba6beb3


RANGER-397 Support RDBMS as audit destination using V3 configuration

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/a2de2450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/a2de2450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/a2de2450

Branch: refs/heads/master
Commit: a2de2450a572468af1928d5d021567c39544e193
Parents: 9e5bd85
Author: Don Bosco Durai <bosco@apache.org>
Authored: Fri May 29 14:54:22 2015 -0700
Committer: Don Bosco Durai <bosco@apache.org>
Committed: Fri May 29 14:54:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/ranger/audit/dao/DaoManager.java |   2 +
 .../audit/destination/DBAuditDestination.java   | 306 +++++++++++++++++++
 .../audit/destination/HDFSAuditDestination.java |   3 +
 .../audit/provider/AuditProviderFactory.java    |   3 +-
 .../ranger/audit/provider/BaseAuditHandler.java |   5 +-
 .../apache/ranger/audit/provider/MiscUtil.java  |  15 +
 .../ranger/audit/queue/AuditAsyncQueue.java     |  25 +-
 .../ranger/audit/queue/AuditBatchQueue.java     |  24 +-
 .../apache/ranger/audit/queue/AuditQueue.java   |   6 +
 .../ranger/audit/queue/AuditSummaryQueue.java   |  25 +-
 10 files changed, 409 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
index 6d81744..fd4d096 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
@@ -49,6 +49,8 @@ public class DaoManager extends DaoManagerBase {
 
 				sEntityManager.set(em);
 			}
+		} else {
+			logger.error("EntityManagerFactory was not set in this thread.", new Throwable());
 		}
 
 		return em;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
new file mode 100644
index 0000000..c58748e
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
+import javax.persistence.Persistence;
+
+import org.apache.ranger.audit.dao.DaoManager;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class DBAuditDestination extends AuditDestination {
+
+	private static final Log logger = LogFactory
+			.getLog(DBAuditDestination.class);
+
+	public static final String PROP_DB_JDBC_DRIVER = "jdbc.driver";
+	public static final String PROP_DB_JDBC_URL = "jdbc.url";
+	public static final String PROP_DB_USER = "user";
+	public static final String PROP_DB_PASSWORD = "password";
+	public static final String PROP_DB_PASSWORD_ALIAS = "password.alias";
+
+	private EntityManagerFactory entityManagerFactory;
+	private DaoManager daoManager;
+
+	private String jdbcDriver = null;
+	private String jdbcURL = null;
+	private String dbUser = null;
+	private String dbPasswordAlias = "auditDBCred";
+
+	public DBAuditDestination() {
+		logger.info("DBAuditDestination() called");
+	}
+
+	@Override
+	public void init(Properties props, String propPrefix) {
+		logger.info("init() called");
+		super.init(props, propPrefix);
+
+		// Initial connect
+		connect();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditHandler#logger(java.util.Collection
+	 * )
+	 */
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		boolean retValue = false;
+
+		if (!beginTransaction()) {
+			return false;
+		}
+		boolean isFailed = false;
+		for (AuditEventBase event : events) {
+			try {
+				event.persist(daoManager);
+			} catch (Throwable t) {
+				logger.error("Error persisting data. event=" + event, t);
+				isFailed = true;
+				break;
+			}
+		}
+		if (isFailed) {
+			retValue = false;
+			rollbackTransaction();
+		} else {
+			retValue = commitTransaction();
+		}
+		return retValue;
+	}
+
+	@Override
+	public void stop() {
+		cleanUp();
+		super.stop();
+	}
+
+	// Local methods
+	protected void connect() {
+		if (isDbConnected()) {
+			return;
+		}
+		try {
+			jdbcDriver = MiscUtil.getStringProperty(props, propPrefix + "."
+					+ PROP_DB_JDBC_DRIVER);
+			jdbcURL = MiscUtil.getStringProperty(props, propPrefix + "."
+					+ PROP_DB_JDBC_URL);
+			dbUser = MiscUtil.getStringProperty(props, propPrefix + "."
+					+ PROP_DB_USER);
+			String dbPassword = MiscUtil.getStringProperty(props, propPrefix
+					+ "." + PROP_DB_PASSWORD);
+			String tmpAlias = MiscUtil.getStringProperty(props, propPrefix
+					+ "." + PROP_DB_PASSWORD_ALIAS);
+			dbPasswordAlias = tmpAlias != null ? tmpAlias : dbPasswordAlias;
+			String credFile = MiscUtil.getStringProperty(props,
+					AUDIT_DB_CREDENTIAL_PROVIDER_FILE);
+
+			if (jdbcDriver == null || jdbcDriver.isEmpty()) {
+				logger.fatal("JDBC driver not provided. Set property name "
+						+ propPrefix + "." + PROP_DB_JDBC_DRIVER);
+				return;
+			}
+			if (jdbcURL == null || jdbcURL.isEmpty()) {
+				logger.fatal("JDBC URL not provided. Set property name "
+						+ propPrefix + "." + PROP_DB_JDBC_URL);
+				return;
+			}
+			if (dbUser == null || dbUser.isEmpty()) {
+				logger.fatal("DB user not provided. Set property name "
+						+ propPrefix + "." + PROP_DB_USER);
+				return;
+			}
+			if (dbPassword == null || dbPassword.isEmpty()) {
+				logger.warn("DB password not provided. Will assume empty for now. Set property name "
+						+ propPrefix + "." + PROP_DB_PASSWORD);
+			} else {
+				dbPassword = MiscUtil.getCredentialString(credFile,
+						dbPasswordAlias);
+			}
+			logger.info("JDBC Driver=" + jdbcDriver + ", JDBC URL=" + jdbcURL
+					+ ", dbUser=" + dbUser + ", passwordAlias="
+					+ dbPasswordAlias + ", credFile=" + credFile);
+
+			Map<String, String> dbProperties = new HashMap<String, String>();
+			dbProperties.put("javax.persistence.jdbc.driver", jdbcDriver);
+			dbProperties.put("javax.persistence.jdbc.url", jdbcURL);
+			dbProperties.put("javax.persistence.jdbc.user", dbUser);
+			if (dbPassword != null) {
+				dbProperties.put("javax.persistence.jdbc.password", dbPassword);
+			}
+
+			entityManagerFactory = Persistence.createEntityManagerFactory(
+					"xa_server", dbProperties);
+
+			logger.info("entityManagerFactory=" + entityManagerFactory);
+
+			daoManager = new DaoManager();
+			daoManager.setEntityManagerFactory(entityManagerFactory);
+
+			// this forces the connection to be made to DB
+			if (daoManager.getEntityManager() != null) {
+				logger.error("Error connecting audit database. EntityManager is null. dbURL="
+						+ jdbcURL + ", dbUser=" + dbUser);
+			}
+
+		} catch (Throwable t) {
+			logger.error("Error connecting audit database. dbURL=" + jdbcURL
+					+ ", dbUser=" + dbUser, t);
+		}
+	}
+
+	private synchronized void cleanUp() {
+		logger.info("DBAuditDestination: cleanUp()");
+
+		try {
+			if (entityManagerFactory != null && entityManagerFactory.isOpen()) {
+				entityManagerFactory.close();
+			}
+		} catch (Exception excp) {
+			logger.error("DBAuditDestination.cleanUp(): failed", excp);
+		} finally {
+			entityManagerFactory = null;
+			daoManager = null;
+		}
+	}
+
+	private EntityManager getEntityManager() {
+		DaoManager daoMgr = daoManager;
+
+		if (daoMgr != null) {
+			try {
+				return daoMgr.getEntityManager();
+			} catch (Exception excp) {
+				logger.error("DBAuditDestination.getEntityManager(): failed",
+						excp);
+
+				cleanUp();
+			}
+		}
+
+		return null;
+	}
+
+	private boolean isDbConnected() {
+		EntityManager em = getEntityManager();
+		return em != null && em.isOpen();
+	}
+
+	private void clearEntityManager() {
+		try {
+			EntityManager em = getEntityManager();
+
+			if (em != null) {
+				em.clear();
+			}
+		} catch (Exception excp) {
+			logger.warn("DBAuditDestination.clearEntityManager(): failed", excp);
+		}
+	}
+
+	private EntityTransaction getTransaction() {
+		if (!isDbConnected()) {
+			connect();
+		}
+
+		EntityManager em = getEntityManager();
+
+		return em != null ? em.getTransaction() : null;
+	}
+
+	private boolean beginTransaction() {
+		EntityTransaction trx = getTransaction();
+
+		if (trx != null && !trx.isActive()) {
+			trx.begin();
+		}
+
+		if (trx == null) {
+			logger.warn("DBAuditDestination.beginTransaction(): trx is null");
+		}
+
+		return trx != null;
+	}
+
+	private boolean commitTransaction() {
+		boolean ret = false;
+		EntityTransaction trx = null;
+
+		try {
+			trx = getTransaction();
+
+			if (trx != null && trx.isActive()) {
+				trx.commit();
+				ret = true;
+			} else {
+				throw new Exception("trx is null or not active");
+			}
+		} catch (Throwable excp) {
+			logger.error("DBAuditDestination.commitTransaction(): failed", excp);
+
+			cleanUp(); // so that next insert will try to init()
+		} finally {
+			clearEntityManager();
+		}
+
+		return ret;
+	}
+
+	private boolean rollbackTransaction() {
+		boolean ret = false;
+		EntityTransaction trx = null;
+
+		try {
+			trx = getTransaction();
+
+			if (trx != null && trx.isActive()) {
+				trx.rollback();
+				ret = true;
+			} else {
+				throw new Exception("trx is null or not active");
+			}
+		} catch (Throwable excp) {
+			logger.error("DBAuditDestination.rollbackTransaction(): failed",
+					excp);
+
+			cleanUp(); // so that next insert will try to init()
+		} finally {
+			clearEntityManager();
+		}
+
+		return ret;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 6ca4fce..67382a9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -105,6 +105,9 @@ public class HDFSAuditDestination extends AuditDestination {
 
 	@Override
 	synchronized public boolean logJSON(Collection<String> events) {
+		if (!initDone) {
+			return false;
+		}
 		if (isStopped) {
 			logError("log() called after stop was requested. name=" + getName());
 			return false;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index d6ef318..c3a05ce 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.DBAuditDestination;
 import org.apache.ranger.audit.destination.FileAuditDestination;
 import org.apache.ranger.audit.destination.HDFSAuditDestination;
 import org.apache.ranger.audit.destination.SolrAuditDestination;
@@ -415,7 +416,7 @@ public class AuditProviderFactory {
 			} else if (providerName.equals("kafka")) {
 				provider = new KafkaAuditProvider();
 			} else if (providerName.equals("db")) {
-				provider = new DbAuditProvider();
+				provider = new DBAuditDestination();
 			} else if (providerName.equals("log4j")) {
 				provider = new Log4jAuditProvider();
 			} else if (providerName.equals("batch")) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
index dd44def..09335c7 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
+
 import com.google.gson.GsonBuilder;
 
 import java.util.concurrent.atomic.AtomicLong;
@@ -33,7 +34,9 @@ import java.util.Properties;
 public abstract class BaseAuditHandler implements AuditHandler {
 	private static final Log LOG = LogFactory.getLog(BaseAuditHandler.class);
 
-	private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
+	static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
+	protected static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE   = "xasecure.audit.credential.provider.file";
+
 
 	private int mLogFailureReportMinIntervalInMs = 60 * 1000;
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
index fe6b0e9..abb0a90 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -29,6 +29,7 @@ import java.util.StringTokenizer;
 import java.util.UUID;
 
 import org.apache.log4j.helpers.LogLog;
+import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -377,5 +378,19 @@ public class MiscUtil {
 		}
 		return list;
 	}
+	
+	public static String getCredentialString(String url,String alias) {
+		String ret = null;
+
+		if(url != null && alias != null) {
+			char[] cred = RangerCredentialProvider.getInstance().getCredentialString(url,alias);
+
+			if ( cred != null ) {
+				ret = new String(cred);	
+			}
+		}
+		
+		return ret;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index d16fff9..de5941a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -102,9 +102,16 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable {
 	 */
 	@Override
 	public void stop() {
+		logger.info("Stop called. name=" + getName());
+		if (stopTime != 0) {
+			stopTime = System.currentTimeMillis();
+		}
 		setDrain(true);
 		try {
 			if (consumerThread != null) {
+				logger.info("Interrupting consumerThread. name=" + getName()
+						+ ", consumer="
+						+ (consumer == null ? null : consumer.getName()));
 				consumerThread.interrupt();
 			}
 		} catch (Throwable t) {
@@ -138,7 +145,7 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable {
 				}
 			} catch (InterruptedException e) {
 				logger.info(
-						"Caught exception in consumer thread. Mostly to about loop",
+						"Caught exception in consumer thread. Mostly server is shutting down.",
 						e);
 			} catch (Throwable t) {
 				logger.error("Caught error during processing request.", t);
@@ -146,13 +153,29 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable
{
 			if (isDrain() && queue.isEmpty()) {
 				break;
 			}
+			if (isDrain()
+					&& (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS)
{
+				logger.warn("Exiting polling loop to max time allowed. name="
+						+ getName() + ", waited for "
+						+ (stopTime - System.currentTimeMillis()) + " ms");
+
+				break;
+			}
 		}
+		logger.info("Exiting polling loop. name=" + getName());
+
 		try {
 			// Call stop on the consumer
+			logger.info("Calling to stop consumer. name=" + getName()
+					+ ", consumer.name=" + consumer.getName());
+
+			// Call stop on the consumer
 			consumer.stop();
 		} catch (Throwable t) {
 			logger.error("Error while calling stop on consumer.", t);
 		}
+		logger.info("Exiting consumerThread.run() method. name=" + getName());
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 8316c2b..645483b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -119,10 +119,19 @@ public class AuditBatchQueue extends AuditQueue implements Runnable
{
 	 */
 	@Override
 	public void stop() {
+		logger.info("Stop called. name=" + getName());
+		if (stopTime != 0) {
+			stopTime = System.currentTimeMillis();
+		}
+
 		setDrain(true);
 		flush();
 		try {
 			if (consumerThread != null) {
+				logger.info("Interrupting consumerThread. name=" + getName()
+						+ ", consumer="
+						+ (consumer == null ? null : consumer.getName()));
+
 				consumerThread.interrupt();
 			}
 		} catch (Throwable t) {
@@ -257,7 +266,7 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
 				}
 			} catch (InterruptedException e) {
 				logger.info(
-						"Caught exception in consumer thread. Mostly to abort loop",
+						"Caught exception in consumer thread. Mostly server is shutting down.",
 						e);
 				setDrain(true);
 			} catch (Throwable t) {
@@ -311,12 +320,24 @@ public class AuditBatchQueue extends AuditQueue implements Runnable
{
 					break;
 				}
 			}
+			if (isDrain()
+					&& (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS)
{
+				logger.warn("Exiting polling loop to max time allowed. name="
+						+ getName() + ", waited for "
+						+ (stopTime - System.currentTimeMillis()) + " ms");
+
+				break;
+			}
+
 		}
 
 		logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
 				+ consumer.getName());
 		try {
 			// Call stop on the consumer
+			logger.info("Calling to stop consumer. name=" + getName()
+					+ ", consumer.name=" + consumer.getName());
+
 			consumer.stop();
 			if (fileSpoolerEnabled) {
 				fileSpooler.stop();
@@ -324,5 +345,6 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
 		} catch (Throwable t) {
 			logger.error("Error while calling stop on consumer.", t);
 		}
+		logger.info("Exiting consumerThread.run() method. name=" + getName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
index 4c3ac5f..039dc6d 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -33,6 +33,9 @@ public abstract class AuditQueue extends BaseAuditHandler {
 	public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
 	public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
 	public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+	
+	//This is the max time the consumer thread will wait before exiting the loop 
+	public static final int AUDIT_CONSUMER_THREAD_WAIT_MS = 5000;
 
 	private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
 	private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
@@ -57,6 +60,9 @@ public abstract class AuditQueue extends BaseAuditHandler {
 	protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
 	protected int fileSpoolDrainThresholdPercent = 80;
 
+	//This is set when the first time stop is called.
+	protected long stopTime = 0;
+	
 	/**
 	 * @param consumer
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
index 7922312..1e5b500 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -122,9 +122,18 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable
{
 	 */
 	@Override
 	public void stop() {
+		logger.info("Stop called. name=" + getName());
+		if (stopTime != 0) {
+			stopTime = System.currentTimeMillis();
+		}
+
 		setDrain(true);
 		try {
 			if (consumerThread != null) {
+				logger.info("Interrupting consumerThread. name=" + getName()
+						+ ", consumer="
+						+ (consumer == null ? null : consumer.getName()));
+
 				consumerThread.interrupt();
 			}
 		} catch (Throwable t) {
@@ -170,7 +179,7 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable
{
 				}
 			} catch (InterruptedException e) {
 				logger.info(
-						"Caught exception in consumer thread. Mostly to about loop",
+						"Caught exception in consumer thread. Mostly server is shutting down.",
 						e);
 			} catch (Throwable t) {
 				logger.error("Caught error during processing request.", t);
@@ -217,14 +226,28 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable
{
 			if (isDrain() && summaryMap.isEmpty() && queue.isEmpty()) {
 				break;
 			}
+			if (isDrain()
+					&& (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS)
{
+				logger.warn("Exiting polling loop to max time allowed. name="
+						+ getName() + ", waited for "
+						+ (stopTime - System.currentTimeMillis()) + " ms");
+
+				break;
+			}
+
 		}
 
+		logger.info("Exiting polling loop. name=" + getName());
 		try {
 			// Call stop on the consumer
+			logger.info("Calling to stop consumer. name=" + getName()
+					+ ", consumer.name=" + consumer.getName());
 			consumer.stop();
 		} catch (Throwable t) {
 			logger.error("Error while calling stop on consumer.", t);
 		}
+		logger.info("Exiting consumerThread.run() method. name=" + getName());
+
 	}
 
 	class AuditSummary {


Mime
View raw message