ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject git commit: ARGUS-154: updated audit framework to reduce writting to application log file
Date Thu, 06 Nov 2014 01:34:58 GMT
Repository: incubator-argus
Updated Branches:
  refs/heads/ranger-0.4 548fac2a5 -> cecfd51c3


ARGUS-154: updated audit framework to reduce writting to application log
file


Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/cecfd51c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/cecfd51c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/cecfd51c

Branch: refs/heads/ranger-0.4
Commit: cecfd51c357b211103924b6c4a51fbb8eabe7c63
Parents: 548fac2
Author: Madhan Neethiraj <madhan@apache.org>
Authored: Sun Nov 2 22:06:46 2014 -0800
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Wed Nov 5 17:32:58 2014 -0800

----------------------------------------------------------------------
 .../audit/provider/AsyncAuditProvider.java      |  38 ++--
 .../xasecure/audit/provider/AuditProvider.java  |   3 +
 .../audit/provider/AuditProviderFactory.java    | 172 +++----------------
 .../audit/provider/BaseAuditProvider.java       | 156 +++++++++++++++++
 .../audit/provider/BufferedAuditProvider.java   |  12 +-
 .../audit/provider/DbAuditProvider.java         |  60 ++++++-
 .../audit/provider/DummyAuditProvider.java      |   6 +
 .../audit/provider/LocalFileLogBuffer.java      |  78 +++++----
 .../audit/provider/Log4jAuditProvider.java      |  31 +++-
 .../audit/provider/MultiDestAuditProvider.java  |  24 ++-
 .../audit/provider/hdfs/HdfsAuditProvider.java  |  46 +++--
 11 files changed, 378 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
index 3ab7d5f..4070f47 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
@@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -75,6 +76,13 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		addAuditProvider(provider);
 	}
 
+	@Override
+	public void init(Properties props) {
+		LOG.info("AsyncAuditProvider(" + mName + ").init()");
+
+		super.init(props);
+	}
+
 	public int getIntervalLogDurationMS() {
 		return intervalLogDurationMS;
 	}
@@ -126,8 +134,9 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		LOG.info("==> AsyncAuditProvider.run()");
 
 		while (!mStopThread) {
+			AuditEventBase event = null;
 			try {
-				AuditEventBase event = dequeueEvent();
+				event = dequeueEvent();
 
 				if (event != null) {
 					super.log(event);
@@ -135,7 +144,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 					flush();
 				}
 			} catch (Exception excp) {
-				LOG.error("AsyncAuditProvider.run()", excp);
+				logFailedEvent(event, excp);
 			}
 		}
 
@@ -203,7 +212,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 			if (intervalInLogCount.get() > 0 || intervalOutLogCount.get() > 0 ) {
 				long queueSize = mQueue.size();
 
-				LOG.info("AsyncAuditProvider-stats:" + mName + ": past " + formatTimeForLog(intervalSinceLastLog)
+				LOG.info("AsyncAuditProvider-stats:" + mName + ": past " + formatIntervalForLog(intervalSinceLastLog)
 						+ ": inLogs=" + intervalInLogCount.get()
 						+ ", outLogs=" + intervalOutLogCount.get()
 						+ ", dropped=" + intervalDropCount.get()
@@ -241,29 +250,6 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		LOG.debug("<== AsyncAuditProvider.waitToComplete()");
 	}
 
-	private String getTimeDiffStr(long time1, long time2) {
-		long timeInMs = Math.abs(time1 - time2);
-		return formatTimeForLog(timeInMs);
-	}
-
-	private String formatTimeForLog(long timeInMs) {
-		long hours = timeInMs / (60 * 60 * 1000);
-		long minutes = (timeInMs / (60 * 1000)) % 60;
-		long seconds = (timeInMs % (60 * 1000)) / 1000;
-		long mSeconds = (timeInMs % (1000));
-
-		if (hours > 0)
-			return String.format("%02d:%02d:%02d.%03d hours", hours, minutes,
-					seconds, mSeconds);
-		else if (minutes > 0)
-			return String.format("%02d:%02d.%03d minutes", minutes, seconds,
-					mSeconds);
-		else if (seconds > 0)
-			return String.format("%02d.%03d seconds", seconds, mSeconds);
-		else
-			return String.format("%03d milli-seconds", mSeconds);
-	}
-
 	private long getTimeTillNextFlush() {
 		long timeTillNextFlush = mMaxFlushInterval;
 

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProvider.java b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProvider.java
index ee9bebe..e026cec 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProvider.java
@@ -18,11 +18,14 @@
 
 package com.xasecure.audit.provider;
 
+import java.util.Properties;
+
 import com.xasecure.audit.model.AuditEventBase;
 
 public interface AuditProvider {
 	public void log(AuditEventBase event);
 
+    public void init(Properties prop);
     public void start();
     public void stop();
     public void waitToComplete();

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
index 5a0c4da..4913b4b 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.xasecure.audit.provider.hdfs.HdfsAuditProvider;
-import com.xasecure.authorization.hadoop.utils.XaSecureCredentialProvider;
 
 
 /*
@@ -44,33 +43,14 @@ public class AuditProviderFactory {
 
 	private static final Log LOG = LogFactory.getLog(AuditProviderFactory.class);
 
-	private static final String AUDIT_IS_ENABLED_PROP               = "xasecure.audit.is.enabled"
;
-
-	private static final String AUDIT_DB_IS_ENABLED_PROP            = "xasecure.audit.db.is.enabled"
;
-	private static final String AUDIT_DB_IS_ASYNC_PROP              = "xasecure.audit.db.is.async";
-	private static final String AUDIT_DB_MAX_QUEUE_SIZE_PROP        = "xasecure.audit.db.async.max.queue.size"
;
-	private static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP    = "xasecure.audit.db.async.max.flush.interval.ms";
-	private static final String AUDIT_DB_BATCH_SIZE_PROP            = "xasecure.audit.db.batch.size"
;
-	private static final String AUDIT_DB_RETRY_MIN_INTERVAL_PROP    = "xasecure.audit.db.config.retry.min.interval.ms";
-	private static final String AUDIT_JPA_CONFIG_PROP_PREFIX        = "xasecure.audit.jpa.";
-	private static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE   = "xasecure.audit.credential.provider.file";
-	private static final String AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS	= "auditDBCred";
-	private static final String AUDIT_JPA_JDBC_PASSWORD  			= "javax.persistence.jdbc.password";
-
-	private static final String AUDIT_HDFS_IS_ENABLED_PROP          = "xasecure.audit.hdfs.is.enabled";
-	private static final String AUDIT_HDFS_IS_ASYNC_PROP            = "xasecure.audit.hdfs.is.async";
-	private static final String AUDIT_HDFS_MAX_QUEUE_SIZE_PROP      = "xasecure.audit.hdfs.async.max.queue.size"
;
-	private static final String AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP  = "xasecure.audit.hdfs.async.max.flush.interval.ms";
-	private static final String AUDIT_HDFS_CONFIG_PREFIX_PROP       = "xasecure.audit.hdfs.config.";
-
-	private static final String AUDIT_LOG4J_IS_ENABLED_PROP         = "xasecure.audit.log4j.is.enabled"
;
-	private static final String AUDIT_LOG4J_IS_ASYNC_PROP           = "xasecure.audit.log4j.is.async";
-	private static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.log4j.async.max.queue.size"
;
-	private static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
+	private static final String AUDIT_IS_ENABLED_PROP       = "xasecure.audit.is.enabled" ;
+	private static final String AUDIT_DB_IS_ENABLED_PROP    = "xasecure.audit.db.is.enabled"
;
+	private static final String AUDIT_HDFS_IS_ENABLED_PROP  = "xasecure.audit.hdfs.is.enabled";
+	private static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled"
;
 	
 	private static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT     = 10 * 1024;
 	private static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT =  5 * 1000;
-
+	
 	private static AuditProviderFactory sFactory;
 
 	private AuditProvider mProvider = null;
@@ -118,10 +98,10 @@ public class AuditProviderFactory {
 		
 		setApplicationType(appType);
 
-		boolean isEnabled             = getBooleanProperty(props, AUDIT_IS_ENABLED_PROP, false);
-		boolean isAuditToDbEnabled    = getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false);
-		boolean isAuditToHdfsEnabled  = getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false);
-		boolean isAuditToLog4jEnabled = getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP,
false);
+		boolean isEnabled             = BaseAuditProvider.getBooleanProperty(props, AUDIT_IS_ENABLED_PROP,
false);
+		boolean isAuditToDbEnabled    = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP,
false);
+		boolean isAuditToHdfsEnabled  = BaseAuditProvider.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP,
false);
+		boolean isAuditToLog4jEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP,
false);
 
 		if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToLog4jEnabled))
{
 			LOG.info("AuditProviderFactory: Audit not enabled..");
@@ -134,32 +114,16 @@ public class AuditProviderFactory {
 		List<AuditProvider> providers = new ArrayList<AuditProvider>();
 
 		if(isAuditToDbEnabled) {
-			Map<String, String> jpaInitProperties = getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
-
-			String jdbcPassword = getCredentialString(getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE),
AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
-
-			if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
-				jpaInitProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
-			}
+			DbAuditProvider dbProvider = new DbAuditProvider();
 
-			LOG.info("AuditProviderFactory: found " + jpaInitProperties.size() + " Audit JPA properties");
-	
-			int dbBatchSize          = getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
-			int dbRetryMinIntervalMs = getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15
* 1000);
-			boolean isAuditToDbAsync = getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
-			
-			if(! isAuditToDbAsync) {
-				dbBatchSize = 1; // Batching not supported in sync mode; need to address multiple threads
making audit calls
-			}
+			boolean isAuditToDbAsync = BaseAuditProvider.getBooleanProperty(props, DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP,
false);
 
-			DbAuditProvider dbProvider = new DbAuditProvider(jpaInitProperties, dbBatchSize, dbRetryMinIntervalMs);
-			
 			if(isAuditToDbAsync) {
-				int maxQueueSize     = getIntProperty(props, AUDIT_DB_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-				int maxFlushInterval = getIntProperty(props, AUDIT_DB_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+				int maxQueueSize     = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP,
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+				int maxFlushInterval = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP,
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
 
 				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("DbAuditProvider", maxQueueSize,
maxFlushInterval, dbProvider);
-				
+
 				providers.add(asyncProvider);
 			} else {
 				providers.add(dbProvider);
@@ -167,22 +131,16 @@ public class AuditProviderFactory {
 		}
 
 		if(isAuditToHdfsEnabled) {
-			Map<String, String> hdfsInitProperties = getPropertiesWithPrefix(props, AUDIT_HDFS_CONFIG_PREFIX_PROP);
-
-			LOG.info("AuditProviderFactory: found " + hdfsInitProperties.size() + " Audit HDFS properties");
-			
 			HdfsAuditProvider hdfsProvider = new HdfsAuditProvider();
-			
-			hdfsProvider.init(hdfsInitProperties);
 
-			boolean isAuditToHdfsAsync = getBooleanProperty(props, AUDIT_HDFS_IS_ASYNC_PROP, false);
+			boolean isAuditToHdfsAsync = BaseAuditProvider.getBooleanProperty(props, HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP,
false);
 
 			if(isAuditToHdfsAsync) {
-				int maxQueueSize     = getIntProperty(props, AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-				int maxFlushInterval = getIntProperty(props, AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+				int maxQueueSize     = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP,
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+				int maxFlushInterval = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP,
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
 
 				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("HdfsAuditProvider", maxQueueSize,
maxFlushInterval, hdfsProvider);
-				
+
 				providers.add(asyncProvider);
 			} else {
 				providers.add(hdfsProvider);
@@ -192,14 +150,14 @@ public class AuditProviderFactory {
 		if(isAuditToLog4jEnabled) {
 			Log4jAuditProvider log4jProvider = new Log4jAuditProvider();
 
-			boolean isAuditToLog4jAsync = getBooleanProperty(props, AUDIT_LOG4J_IS_ASYNC_PROP, false);
-			
+			boolean isAuditToLog4jAsync = BaseAuditProvider.getBooleanProperty(props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP,
false);
+
 			if(isAuditToLog4jAsync) {
-				int maxQueueSize     = getIntProperty(props, AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-				int maxFlushInterval = getIntProperty(props, AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+				int maxQueueSize     = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP,
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+				int maxFlushInterval = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP,
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
 
 				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("Log4jAuditProvider", maxQueueSize,
maxFlushInterval, log4jProvider);
-				
+
 				providers.add(asyncProvider);
 			} else {
 				providers.add(log4jProvider);
@@ -218,6 +176,7 @@ public class AuditProviderFactory {
 			mProvider = multiDestProvider;
 		}
 		
+		mProvider.init(props);
 		mProvider.start();
 
 		JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider);
@@ -265,74 +224,6 @@ public class AuditProviderFactory {
 		MiscUtil.setApplicationType(strAppType);
 	}
 	
-	private Map<String, String> getPropertiesWithPrefix(Properties props, String prefix)
{
-		Map<String, String> prefixedProperties = new HashMap<String, String>();
-		
-		for(String key : props.stringPropertyNames()) {
-			if(key == null) {
-				continue;
-			}
-			
-			String val = props.getProperty(key);
-			
-			if(key.startsWith(prefix)) {
-				key = key.substring(prefix.length());
-
-				if(key == null) {
-					continue;
-				}
-				
-				prefixedProperties.put(key, val);
-			}
-		}
-
-            
-		return prefixedProperties;
-	}
-	
-	private boolean getBooleanProperty(Properties props, String propName, boolean defValue)
{
-		boolean ret = defValue;
-
-		if(props != null && propName != null) {
-			String val = props.getProperty(propName);
-			
-			if(val != null) {
-				ret = Boolean.valueOf(val);
-			}
-		}
-		
-		return ret;
-	}
-	
-	private int getIntProperty(Properties props, String propName, int defValue) {
-		int ret = defValue;
-
-		if(props != null && propName != null) {
-			String val = props.getProperty(propName);
-			
-			if(val != null) {
-				ret = Integer.parseInt(val);
-			}
-		}
-		
-		return ret;
-	}
-	
-	
-	private String getStringProperty(Properties props, String propName) {
-	
-		String ret = null;
-		if(props != null && propName != null) {
-			String val = props.getProperty(propName);
-			if ( val != null){
-				ret = val;
-			}
-			
-		}
-		
-		return ret;
-	}
-	
 	private AuditProvider getDefaultProvider() {
 		return new DummyAuditProvider();
 	}
@@ -348,20 +239,5 @@ public class AuditProviderFactory {
 			mProvider.waitToComplete();
 			mProvider.stop();
 	    }
-	  }
-
-	
-	private String getCredentialString(String url,String alias) {
-		String ret = null;
-
-		if(url != null && alias != null) {
-			char[] cred = XaSecureCredentialProvider.getInstance().getCredentialString(url,alias);
-
-			if ( cred != null ) {
-				ret = new String(cred);	
-			}
-		}
-		
-		return ret;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/BaseAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/BaseAuditProvider.java
new file mode 100644
index 0000000..d2c9150
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/BaseAuditProvider.java
@@ -0,0 +1,156 @@
+package com.xasecure.audit.provider;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.xasecure.audit.model.AuditEventBase;
+
+public abstract class BaseAuditProvider implements AuditProvider {
+	private static final Log LOG = LogFactory.getLog(BaseAuditProvider.class);
+
+	private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
+
+	private int   mLogFailureReportMinIntervalInMs = 60 * 1000;
+
+	private AtomicLong mFailedLogLastReportTime       = new AtomicLong(0);
+	private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
+	private AtomicLong mFailedLogCountLifeTime        = new AtomicLong(0);
+
+
+	public BaseAuditProvider() {
+	}
+	
+	@Override
+	public void init(Properties props) {
+		LOG.info("BaseAuditProvider.init()");
+
+		mLogFailureReportMinIntervalInMs = getIntProperty(props, AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP,
60 * 1000);
+	}
+
+	public void logFailedEvent(AuditEventBase event) {
+		logFailedEvent(event, null);
+ 	}
+
+	public void logFailedEvent(AuditEventBase event, Throwable excp) {
+		long now = System.currentTimeMillis();
+		
+		long timeSinceLastReport  = now - mFailedLogLastReportTime.get();
+		long countSinceLastReport = mFailedLogCountSinceLastReport.incrementAndGet();
+		long countLifeTime        = mFailedLogCountLifeTime.incrementAndGet();
+
+		if(timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+			mFailedLogLastReportTime.set(now);
+			mFailedLogCountSinceLastReport.set(0);
+
+			if(excp != null) {
+				LOG.warn("failed to log audit event: " + MiscUtil.stringify(event), excp);
+			} else {
+				LOG.warn("failed to log audit event: " + MiscUtil.stringify(event));
+			}
+
+			if(countLifeTime > 1) { // no stats to print for the 1st failure
+				LOG.warn("Log failure count: " + countSinceLastReport + " in past " + formatIntervalForLog(timeSinceLastReport)
+ "; " + countLifeTime + " during process lifetime");
+			}
+		}
+ 	}
+
+	public static Map<String, String> getPropertiesWithPrefix(Properties props, String
prefix) {
+		Map<String, String> prefixedProperties = new HashMap<String, String>();
+
+		if(props != null && prefix != null) {
+			for(String key : props.stringPropertyNames()) {
+				if(key == null) {
+					continue;
+				}
+
+				String val = props.getProperty(key);
+
+				if(key.startsWith(prefix)) {
+					key = key.substring(prefix.length());
+
+					if(key == null) {
+						continue;
+					}
+
+					prefixedProperties.put(key, val);
+				}
+			}
+		}
+
+		return prefixedProperties;
+	}
+	
+	public static boolean getBooleanProperty(Properties props, String propName, boolean defValue)
{
+		boolean ret = defValue;
+
+		if(props != null && propName != null) {
+			String val = props.getProperty(propName);
+
+			if(val != null) {
+				ret = Boolean.valueOf(val);
+			}
+		}
+
+		return ret;
+	}
+	
+	public static int getIntProperty(Properties props, String propName, int defValue) {
+		int ret = defValue;
+
+		if(props != null && propName != null) {
+			String val = props.getProperty(propName);
+
+			if(val != null) {
+				try {
+					ret = Integer.parseInt(val);
+				} catch(NumberFormatException excp) {
+					ret = defValue;
+				}
+			}
+		}
+
+		return ret;
+	}
+	
+	
+	public static String getStringProperty(Properties props, String propName) {
+		String ret = null;
+
+		if(props != null && propName != null) {
+			String val = props.getProperty(propName);
+			if ( val != null){
+				ret = val;
+			}
+		}
+
+		return ret;
+	}
+
+	public String getTimeDiffStr(long time1, long time2) {
+		long timeInMs = Math.abs(time1 - time2);
+		return formatIntervalForLog(timeInMs);
+	}
+
+	public String formatIntervalForLog(long timeInMs) {
+		long hours = timeInMs / (60 * 60 * 1000);
+		long minutes = (timeInMs / (60 * 1000)) % 60;
+		long seconds = (timeInMs % (60 * 1000)) / 1000;
+		long mSeconds = (timeInMs % (1000));
+
+		if (hours > 0)
+			return String.format("%02d:%02d:%02d.%03d hours", hours, minutes,
+					seconds, mSeconds);
+		else if (minutes > 0)
+			return String.format("%02d:%02d.%03d minutes", minutes, seconds,
+					mSeconds);
+		else if (seconds > 0)
+			return String.format("%02d.%03d seconds", seconds, mSeconds);
+		else
+			return String.format("%03d milli-seconds", mSeconds);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
index 92a6980..990a446 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
@@ -17,12 +17,18 @@
  */
 package com.xasecure.audit.provider;
 
+import java.util.Properties;
+
 import com.xasecure.audit.model.AuditEventBase;
 
-public abstract class BufferedAuditProvider implements AuditProvider {
+public abstract class BufferedAuditProvider extends BaseAuditProvider {
 	private LogBuffer<AuditEventBase>      mBuffer      = null;
 	private LogDestination<AuditEventBase> mDestination = null;
 
+	@Override
+	public void init(Properties props) {
+		super.init(props);
+	}
 
 	@Override
 	public void log(AuditEventBase event) {
@@ -38,7 +44,9 @@ public abstract class BufferedAuditProvider implements AuditProvider {
 			event.setEventId(MiscUtil.generateUniqueId());
 		}
 
-		mBuffer.add(event);
+		if(! mBuffer.add(event)) {
+			logFailedEvent(event);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/com/xasecure/audit/provider/DbAuditProvider.java
index f8dc441..8657961 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/DbAuditProvider.java
@@ -20,6 +20,7 @@ package com.xasecure.audit.provider;
 
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.Properties;
 
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
@@ -31,16 +32,28 @@ import org.apache.commons.logging.LogFactory;
 
 import com.xasecure.audit.dao.DaoManager;
 import com.xasecure.audit.model.AuditEventBase;
+import com.xasecure.authorization.hadoop.utils.XaSecureCredentialProvider;
 
 
 /*
  * NOTE:
  * - Instances of this class are not thread-safe.
  */
-public class DbAuditProvider implements AuditProvider {
+public class DbAuditProvider extends BaseAuditProvider {
 
 	private static final Log LOG = LogFactory.getLog(DbAuditProvider.class);
 
+	public static final String AUDIT_DB_IS_ASYNC_PROP           = "xasecure.audit.db.is.async";
+	public static final String AUDIT_DB_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.db.async.max.queue.size"
;
+	public static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.db.async.max.flush.interval.ms";
+
+	private static final String AUDIT_DB_BATCH_SIZE_PROP            = "xasecure.audit.db.batch.size"
;
+	private static final String AUDIT_DB_RETRY_MIN_INTERVAL_PROP    = "xasecure.audit.db.config.retry.min.interval.ms";
+	private static final String AUDIT_JPA_CONFIG_PROP_PREFIX        = "xasecure.audit.jpa.";
+	private static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE   = "xasecure.audit.credential.provider.file";
+	private static final String AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS	= "auditDBCred";
+	private static final String AUDIT_JPA_JDBC_PASSWORD  			= "javax.persistence.jdbc.password";
+
 	private EntityManagerFactory entityManagerFactory;
 	private DaoManager          daoManager;
 	
@@ -51,12 +64,31 @@ public class DbAuditProvider implements AuditProvider {
 	private Map<String, String> mDbProperties         = null;
 	private long                mLastDbFailedTime     = 0;
 
-	public DbAuditProvider(Map<String, String> properties, int dbBatchSize, int dbRetryMinIntervalMs)
{
+	public DbAuditProvider() {
 		LOG.info("DbAuditProvider: creating..");
-		
-		mDbProperties         = properties;
-		mCommitBatchSize      = dbBatchSize < 1 ? 1 : dbBatchSize;
-		mDbRetryMinIntervalMs = dbRetryMinIntervalMs;
+	}
+
+	@Override
+	public void init(Properties props) {
+		LOG.info("DbAuditProvider.init()");
+
+		super.init(props);
+
+		mDbProperties         = BaseAuditProvider.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
+		mCommitBatchSize      = BaseAuditProvider.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP,
1000);
+		mDbRetryMinIntervalMs = BaseAuditProvider.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP,
15 * 1000);
+
+		boolean isAsync = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
+
+		if(! isAsync) {
+			mCommitBatchSize = 1; // Batching not supported in sync mode
+		}
+
+		String jdbcPassword = getCredentialString(BaseAuditProvider.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE),
AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
+
+		if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
+			mDbProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
+		}
 	}
 
 	@Override
@@ -308,7 +340,17 @@ public class DbAuditProvider implements AuditProvider {
 		LOG.warn(msg, excp);
 	}
 
-	private void logFailedEvent(AuditEventBase event) {
-		LOG.warn("failed to log audit event: " + MiscUtil.stringify(event) + " }");
- 	}
+	private String getCredentialString(String url,String alias) {
+		String ret = null;
+
+		if(url != null && alias != null) {
+			char[] cred = XaSecureCredentialProvider.getInstance().getCredentialString(url,alias);
+
+			if ( cred != null ) {
+				ret = new String(cred);	
+			}
+		}
+		
+		return ret;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
index 777f740..73f4b27 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
@@ -17,10 +17,16 @@
  */
 package com.xasecure.audit.provider;
 
+import java.util.Properties;
+
 import com.xasecure.audit.model.AuditEventBase;
 
 
 public class DummyAuditProvider implements AuditProvider {
+	@Override
+	public void init(Properties prop) {
+		// intentionally left empty
+	}
 
 	@Override
 	public void log(AuditEventBase event) {

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
index 0861dd1..69df673 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
@@ -56,6 +56,8 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
 	private String mBufferFilename   = null;
 	private long   mNextRolloverTime = 0;
 	private long   mNextFlushTime    = 0;
+	private int    mFileOpenRetryIntervalInMs = 60 * 1000;
+	private long   mNextFileOpenRetryTime     = 0;
 
 	private DestinationDispatcherThread<T> mDispatcherThread = null;
 	
@@ -200,9 +202,9 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T>
{
 					ret = true;
 				} catch(IOException excp) {
 					mLogger.warn("LocalFileLogBuffer.add(): write failed", excp);
+
+					closeFile();
 				}
-			} else {
-				mLogger.warn("LocalFileLogBuffer.add(): writer is null");
 			}
 		}
 
@@ -216,38 +218,44 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T>
{
 
 	private synchronized void openFile() {
 		mLogger.debug("==> LocalFileLogBuffer.openFile()");
+		
+		long now = System.currentTimeMillis();
 
 		closeFile();
 
-		mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds
* 1000L));
-
-		long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds
* 1000L));
-
-		mBufferFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile, startTime);
-
-		FileOutputStream ostream = null;
-		try {
-			ostream = new FileOutputStream(mBufferFilename, mIsAppend);
-		} catch(Exception excp) {
+		if(mNextFileOpenRetryTime <= now) {
+			mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds
* 1000L));
+	
+			long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds
* 1000L));
+	
+			mBufferFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile, startTime);
+	
 			MiscUtil.createParents(new File(mBufferFilename));
-
+	
+			FileOutputStream ostream = null;
 			try {
 				ostream = new FileOutputStream(mBufferFilename, mIsAppend);
-			} catch(Exception ex) {
-				// ignore; error printed down
+			} catch(Exception excp) {
+				mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file " + mBufferFilename,
excp);
+			}
+	
+			if(ostream != null) {
+				mWriter = createWriter(ostream);
+	
+				if(mWriter != null) {
+					mLogger.debug("LocalFileLogBuffer.openFile(): opened file " + mBufferFilename);
+		
+					mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
+				} else {
+					mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mBufferFilename);
+		
+					mBufferFilename = null;
+				}
+			}
+			
+			if(mWriter == null) {
+				mNextFileOpenRetryTime = now + mFileOpenRetryIntervalInMs;
 			}
-		}
-
-		mWriter = createWriter(ostream);
-
-		if(mWriter != null) {
-			mLogger.debug("LocalFileLogBuffer.openFile(): opened file " + mBufferFilename);
-
-			mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
-		} else {
-			mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mBufferFilename);
-
-			mBufferFilename = null;
 		}
 
 		mLogger.debug("<== LocalFileLogBuffer.openFile()");
@@ -432,7 +440,7 @@ class DestinationDispatcherThread<T> extends Thread {
 					try {
 						mCompletedLogfiles.wait(pollIntervalInMs);
 					} catch(InterruptedException excp) {
-						mLogger.warn("DestinationDispatcherThread.run(): failed to wait for log file", excp);
+						throw new RuntimeException("DestinationDispatcherThread.run(): failed to wait for log
file", excp);
 					}
 				}
 				
@@ -494,7 +502,11 @@ class DestinationDispatcherThread<T> extends Thread {
 
 			// loop until log is sent successfully
 			while(!mStopThread && !mDestination.sendStringified(log)) {
-				sleep(destinationPollIntervalInMs, "LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile
+ "): failed to wait for destination to be available");
+				try {
+					Thread.sleep(destinationPollIntervalInMs);
+				} catch(InterruptedException excp) {
+					throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile +
"): failed while waiting for destination to be available", excp);
+				}
 			}
 		}
 
@@ -655,14 +667,6 @@ class DestinationDispatcherThread<T> extends Thread {
 	    return reader;
 	}
 
-	private void sleep(long sleepTimeInMs, String onFailMsg) {
-		try {
-			Thread.sleep(sleepTimeInMs);
-		} catch(InterruptedException excp) {
-			mLogger.warn(onFailMsg, excp);
-		}
-	}
-
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
index cfc3c51..65d5a6f 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
@@ -18,6 +18,8 @@
 
 package com.xasecure.audit.provider;
 
+import java.util.Properties;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -26,23 +28,44 @@ import com.google.gson.GsonBuilder;
 import com.xasecure.audit.model.AuditEventBase;
 
 
-public class Log4jAuditProvider implements AuditProvider {
+public class Log4jAuditProvider extends BaseAuditProvider {
 
 	private static final Log LOG      = LogFactory.getLog(Log4jAuditProvider.class);
 	private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class.getName());
 
+	public static final String AUDIT_LOG4J_IS_ASYNC_PROP           = "xasecure.audit.log4j.is.async";
+	public static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.log4j.async.max.queue.size"
;
+	public static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
+
+	private Gson mGsonBuilder = null;
 
 	public Log4jAuditProvider() {
 		LOG.info("Log4jAuditProvider: creating..");
 	}
 
 	@Override
+	public void init(Properties props) {
+		LOG.info("Log4jAuditProvider.init()");
+
+		super.init(props);
+
+		try {
+			mGsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+		} catch(Throwable excp) {
+			LOG.warn("Log4jAuditProvider.init(): failed to create GsonBuilder object. events will
be formated using toString(), instead of Json", excp);
+		}
+	}
+
+	@Override
 	public void log(AuditEventBase event) {
 		if(! AUDITLOG.isInfoEnabled())
 			return;
-		Gson gson= new GsonBuilder().create();
-		String eventAsJson = gson.toJson(event.toString()) ;
-		AUDITLOG.info(eventAsJson);
+		
+		if(event != null) {
+			String eventStr = mGsonBuilder != null ? mGsonBuilder.toJson(event) : event.toString();
+
+			AUDITLOG.info(eventStr);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
index cee9f5a..d0ce95c 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
@@ -3,6 +3,7 @@ package com.xasecure.audit.provider;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,7 +33,7 @@ import com.xasecure.audit.model.StormAuditEvent;
  * limitations under the License.
  */
 
-public class MultiDestAuditProvider implements AuditProvider {
+public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	private static final Log LOG = LogFactory.getLog(MultiDestAuditProvider.class);
 
@@ -44,11 +45,24 @@ public class MultiDestAuditProvider implements AuditProvider {
 	}
 
 	public MultiDestAuditProvider(AuditProvider provider) {
-		LOG.info("MultiDestAuditProvider: creating..");
-
 		addAuditProvider(provider);
 	}
-	
+
+	@Override
+	public void init(Properties props) {
+		LOG.info("MultiDestAuditProvider.init()");
+
+		super.init(props);
+
+		for(AuditProvider provider : mProviders) {
+    		try {
+                provider.init(props);
+    		} catch(Throwable excp) {
+    			LOG.info("MultiDestAuditProvider.init(): failed" + provider.getClass().getCanonicalName()
+ ")");
+    		}
+        }
+	}
+
 	public void addAuditProvider(AuditProvider provider) {
 		if(provider != null) {
 			LOG.info("MultiDestAuditProvider.addAuditProvider(providerType=" + provider.getClass().getCanonicalName()
+ ")");
@@ -71,7 +85,7 @@ public class MultiDestAuditProvider implements AuditProvider {
     		try {
                 provider.log(event);
     		} catch(Throwable excp) {
-    			LOG.error("AsyncAuditProvider.log(): failed for provider { " + provider.getClass().getName()
+ " }", excp);
+    			logFailedEvent(event, excp);
     		}
         }
 	}

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cecfd51c/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
index cc076f5..b6a50ab 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
@@ -17,11 +17,13 @@
 package com.xasecure.audit.provider.hdfs;
 
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.xasecure.audit.model.AuditEventBase;
+import com.xasecure.audit.provider.BaseAuditProvider;
 import com.xasecure.audit.provider.BufferedAuditProvider;
 import com.xasecure.audit.provider.DebugTracer;
 import com.xasecure.audit.provider.LocalFileLogBuffer;
@@ -30,26 +32,36 @@ import com.xasecure.audit.provider.MiscUtil;
 
 public class HdfsAuditProvider extends BufferedAuditProvider {
 	private static final Log LOG = LogFactory.getLog(HdfsAuditProvider.class);
-	
+
+	public static final String AUDIT_HDFS_IS_ASYNC_PROP           = "xasecure.audit.hdfs.is.async";
+	public static final String AUDIT_HDFS_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.hdfs.async.max.queue.size"
;
+	public static final String AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.hdfs.async.max.flush.interval.ms";
+
 	public HdfsAuditProvider() {
 	}
 
-	public void init(Map<String, String> properties) {
-		String encoding                                = properties.get("encoding");
-
-		String hdfsDestinationDirectory                = properties.get("destination.directory");
-		String hdfsDestinationFile                     = properties.get("destination.file");
-		int    hdfsDestinationFlushIntervalSeconds     = MiscUtil.parseInteger(properties.get("destination.flush.interval.seconds"),
15 * 60);
-		int    hdfsDestinationRolloverIntervalSeconds  = MiscUtil.parseInteger(properties.get("destination.rollover.interval.seconds"),
24 * 60 * 60);
-		int    hdfsDestinationOpenRetryIntervalSeconds = MiscUtil.parseInteger(properties.get("destination.open.retry.interval.seconds"),
60);
-
-		String localFileBufferDirectory               = properties.get("local.buffer.directory");
-		String localFileBufferFile                    = properties.get("local.buffer.file");
-		int    localFileBufferFlushIntervalSeconds    = MiscUtil.parseInteger(properties.get("local.buffer.flush.interval.seconds"),
1 * 60);
-		int    localFileBufferFileBufferSizeBytes     = MiscUtil.parseInteger(properties.get("local.buffer.file.buffer.size.bytes"),
8 * 1024);
-		int    localFileBufferRolloverIntervalSeconds = MiscUtil.parseInteger(properties.get("local.buffer.rollover.interval.seconds"),
10 * 60);
-		String localFileBufferArchiveDirectory        = properties.get("local.archive.directory");
-		int    localFileBufferArchiveFileCount        = MiscUtil.parseInteger(properties.get("local.archive.max.file.count"),
10);
+	public void init(Properties props) {
+		LOG.info("HdfsAuditProvider.init()");
+
+		super.init(props);
+
+		Map<String, String> hdfsProps = BaseAuditProvider.getPropertiesWithPrefix(props,
"xasecure.audit.hdfs.config.");
+
+		String encoding                                = hdfsProps.get("encoding");
+
+		String hdfsDestinationDirectory                = hdfsProps.get("destination.directory");
+		String hdfsDestinationFile                     = hdfsProps.get("destination.file");
+		int    hdfsDestinationFlushIntervalSeconds     = MiscUtil.parseInteger(hdfsProps.get("destination.flush.interval.seconds"),
15 * 60);
+		int    hdfsDestinationRolloverIntervalSeconds  = MiscUtil.parseInteger(hdfsProps.get("destination.rollover.interval.seconds"),
24 * 60 * 60);
+		int    hdfsDestinationOpenRetryIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.open.retry.interval.seconds"),
60);
+
+		String localFileBufferDirectory               = hdfsProps.get("local.buffer.directory");
+		String localFileBufferFile                    = hdfsProps.get("local.buffer.file");
+		int    localFileBufferFlushIntervalSeconds    = MiscUtil.parseInteger(hdfsProps.get("local.buffer.flush.interval.seconds"),
1 * 60);
+		int    localFileBufferFileBufferSizeBytes     = MiscUtil.parseInteger(hdfsProps.get("local.buffer.file.buffer.size.bytes"),
8 * 1024);
+		int    localFileBufferRolloverIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("local.buffer.rollover.interval.seconds"),
10 * 60);
+		String localFileBufferArchiveDirectory        = hdfsProps.get("local.archive.directory");
+		int    localFileBufferArchiveFileCount        = MiscUtil.parseInteger(hdfsProps.get("local.archive.max.file.count"),
10);
 
 		DebugTracer tracer = new Log4jTracer(LOG);
 


Mime
View raw message