ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [1/4] git commit: ARGUS-134: - configured audit-to-HDFS to use async mode to write to local file buffer. - updated async-audit to use java.util.concurrent.BlockingQueue, instead of using synchornized blocks to access Queue - updated default audit configu
Date Sun, 26 Oct 2014 04:47:32 GMT
Repository: incubator-argus
Updated Branches:
  refs/heads/master 069e55678 -> 372b98002


ARGUS-134:
- configured audit-to-HDFS to use async mode to write to local file
buffer.
- updated async-audit to use java.util.concurrent.BlockingQueue, instead
of using synchornized blocks to access Queue
- updated default audit configuration, to enable HBase with
'audit-to-HDFS' and other components with 'audit-to-DB'
- added INFO logs in audit-to-HDFS for open/close of HDFS files
- updated async-audit with better stats message, to include count of
incoming logs and queue-size

Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/2c3926b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/2c3926b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/2c3926b5

Branch: refs/heads/master
Commit: 2c3926b558c4714434b058c83d0a784a749dda1b
Parents: f43c21b
Author: mneethiraj <mneethiraj@hortonworks.com>
Authored: Fri Oct 24 18:26:21 2014 -0700
Committer: mneethiraj <mneethiraj@hortonworks.com>
Committed: Fri Oct 24 18:26:21 2014 -0700

----------------------------------------------------------------------
 .../audit/provider/AsyncAuditProvider.java      | 211 +++++++------------
 .../audit/provider/AuditProviderFactory.java    |  31 +--
 .../xasecure/audit/provider/DebugTracer.java    |   2 +
 .../audit/provider/LocalFileLogBuffer.java      |   2 +-
 .../xasecure/audit/provider/Log4jTracer.java    |   8 +
 .../audit/provider/hdfs/HdfsLogDestination.java |  48 +++--
 hbase-agent/conf/xasecure-audit.xml             |  19 +-
 hdfs-agent/conf/xasecure-audit.xml              |  27 ++-
 hive-agent/conf/xasecure-audit.xml              |  17 +-
 knox-agent/conf/xasecure-audit.xml              |  17 +-
 storm-agent/conf/xasecure-audit.xml             |  17 +-
 11 files changed, 208 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
index b8de56d..fcb013d 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/AsyncAuditProvider.java
@@ -19,8 +19,10 @@
 
  package com.xasecure.audit.provider;
 
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,29 +36,35 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 
 	private static int sThreadCount = 0;
 
-	private Queue<AuditEventBase> mQueue = new LinkedList<AuditEventBase>();
-	private Thread mThread = null;
-	private boolean mStopThread = false;
-	private int mMaxQueueSize = -1;
-	private int mResumeQueueSize = 0;
-	private int mMaxFlushInterval = -1;
-	private long mFirstDropTime = 0;
-	private int mDropCount = 0;
+	private BlockingQueue<AuditEventBase> mQueue = null;
+	private Thread  mThread           = null;
+	private boolean mStopThread       = false;
+	private String  mName             = null;
+	private int     mMaxQueueSize     = -1;
+	private int     mMaxFlushInterval = -1;
 
 	// Summary of logs handled
-	private long lifeTimeLogCount = 0; // Total count, including drop count
-	private long lifeTimeDropCount = 0;
-	private long intervalLogCount = 0;
-	private long intervalDropCount = 0;
-	private long lastIntervalLogTime = System.currentTimeMillis();
-	private int intervalLogDurationMS = 60000;
-
-	public AsyncAuditProvider() {
-		LOG.info("AsyncAuditProvider: creating..");
+	private AtomicLong lifeTimeInLogCount  = new AtomicLong(0); // Total count, including drop
count
+	private AtomicLong lifeTimeOutLogCount = new AtomicLong(0);
+	private AtomicLong lifeTimeDropCount   = new AtomicLong(0);
+	private AtomicLong intervalInLogCount  = new AtomicLong(0);
+	private AtomicLong intervalOutLogCount = new AtomicLong(0);
+	private AtomicLong intervalDropCount   = new AtomicLong(0);
+	private long lastIntervalLogTime   = System.currentTimeMillis();
+	private int  intervalLogDurationMS = 60000;
+
+	public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval) {
+		LOG.info("AsyncAuditProvider(" + name + "): creating..");
+
+		mName             = name;
+		mMaxQueueSize     = maxQueueSize;
+		mMaxFlushInterval = maxFlushInterval;
+
+		mQueue = new ArrayBlockingQueue<AuditEventBase>(mMaxQueueSize);
 	}
 
-	public AsyncAuditProvider(AuditProvider provider) {
-		LOG.info("AsyncAuditProvider: creating..");
+	public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditProvider
provider) {
+		this(name, maxQueueSize, maxFlushInterval);
 
 		addAuditProvider(provider);
 	}
@@ -69,18 +77,6 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		this.intervalLogDurationMS = intervalLogDurationMS;
 	}
 
-	public void setMaxQueueSize(int maxQueueSize) {
-		mMaxQueueSize = maxQueueSize > 0 ? maxQueueSize : Integer.MAX_VALUE;
-	}
-
-	public void setMaxFlushInterval(int maxFlushInterval) {
-		mMaxFlushInterval = maxFlushInterval;
-	}
-
-	public void setResumeQueueSize(int resumeQueueSize) {
-		mResumeQueueSize = resumeQueueSize > 0 ? resumeQueueSize : 0;
-	}
-
 	@Override
 	public void log(AuditEventBase event) {
 		LOG.debug("AsyncAuditProvider.logEvent(AuditEventBase)");
@@ -102,10 +98,6 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 	public void stop() {
 		mStopThread = true;
 
-		synchronized (mQueue) {
-			mQueue.notify();
-		}
-
 		try {
 			mThread.join();
 		} catch (InterruptedException e) {
@@ -151,120 +143,81 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 	}
 
 	private void queueEvent(AuditEventBase event) {
-		synchronized (mQueue) {
-
-			// Running this within synchronized block to avoid multiple
-			// logs from different threads
-			// Running this upfront so the log interval is fixed
-
-			// Log summary if required
-			logSummaryIfRequired();
+		// Increase counts
+		lifeTimeInLogCount.incrementAndGet();
+		intervalInLogCount.incrementAndGet();
 
-			// Increase counts
-			lifeTimeLogCount++;
-			intervalLogCount++;
+		if(! mQueue.offer(event)) {
+			lifeTimeDropCount.incrementAndGet();
+			intervalDropCount.incrementAndGet();
+		}
+	}
 
-			int maxQueueSize = mMaxQueueSize;
+	private AuditEventBase dequeueEvent() {
+		AuditEventBase ret = mQueue.poll();
 
-			// if we are currently dropping, don't resume until the queue size
-			// goes to mResumeQueueSize
-			if (mDropCount > 0) {
-				maxQueueSize = (mResumeQueueSize < maxQueueSize ? mResumeQueueSize
-						: (int) (maxQueueSize / 100.0 * 80)) + 1;
-			}
+		try {
+			while(ret == null && !mStopThread) {
+				logSummaryIfRequired();
 
-			if (mQueue.size() < maxQueueSize) {
-				mQueue.add(event);
-				mQueue.notify();
+				if (mMaxFlushInterval > 0 && isFlushPending()) {
+					long timeTillNextFlush = getTimeTillNextFlush();
 
-				if (mDropCount > 0) {
-					String pauseDuration = getTimeDiffStr(System.currentTimeMillis(),
-							mFirstDropTime);
+					if (timeTillNextFlush <= 0) {
+						break; // force flush
+					}
 
-					LOG.warn("AsyncAuditProvider: resumes after dropping "
-							+ mDropCount + " audit logs in past "
-							+ pauseDuration);
+					ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS);
+				} else {
+					// Let's wake up for summary logging
+					long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime);
+					waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime;
 
-					mDropCount = 0;
-					mFirstDropTime = 0;
+					ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS);
 				}
-			} else {
-				if (mDropCount == 0) {
-					mFirstDropTime = System.currentTimeMillis();
-	
-					LOG.warn("AsyncAuditProvider: queue size is at max ("
-							+ mMaxQueueSize
-							+ "); further audit logs will be dropped until queue clears up");
-				}
-
-				mDropCount++;
-				lifeTimeDropCount++;
-				intervalDropCount++;
 			}
-
+		} catch(InterruptedException excp) {
+			LOG.error("AsyncAuditProvider.dequeueEvent()", excp);
 		}
-	}
 
-	private AuditEventBase dequeueEvent() {
-		synchronized (mQueue) {
-			while (mQueue.isEmpty()) {
-				if (mStopThread) {
-					return null;
-				}
-				try {
-					// Log summary if required
-					logSummaryIfRequired();
-
-					if (mMaxFlushInterval > 0 && isFlushPending()) {
-						long timeTillNextFlush = getTimeTillNextFlush();
-
-						if (timeTillNextFlush <= 0) {
-							return null; // force flush
-						}
-
-						mQueue.wait(timeTillNextFlush);
-					} else {
-						// Let's wake up for summary logging
-						long waitTime = intervalLogDurationMS
-								- (System.currentTimeMillis() - lastIntervalLogTime);
-						waitTime = waitTime <= 0 ? intervalLogDurationMS
-								: waitTime;
-						mQueue.wait(waitTime);
-					}
-				} catch (InterruptedException excp) {
-					LOG.error("AsyncAuditProvider.dequeueEvent()", excp);
-
-					return null;
-				}
-			}
+		if(ret != null) {
+			lifeTimeOutLogCount.incrementAndGet();
+			intervalOutLogCount.incrementAndGet();
+		}
 
-			AuditEventBase ret = mQueue.remove();
+		logSummaryIfRequired();
 
-			return ret;
-		}
+		return ret;
 	}
 
 	private void logSummaryIfRequired() {
-		if (System.currentTimeMillis() - lastIntervalLogTime > intervalLogDurationMS) {
-			// Log interval and life time summary
-
-			if (intervalLogCount > 0) {
-				LOG.info("AsyncAuditProvider: Interval="
-						+ formatTimeForLog(intervalLogDurationMS) + ", logs="
-						+ intervalLogCount + ", dropped=" + intervalDropCount);
-				LOG.info("AsyncAuditProvider: Process Lifetime, logs="
-						+ lifeTimeLogCount + ", dropped=" + lifeTimeDropCount);
+		long intervalSinceLastLog = System.currentTimeMillis() - lastIntervalLogTime;
+
+		if (intervalSinceLastLog > intervalLogDurationMS) {
+			if (intervalInLogCount.get() > 0 || intervalOutLogCount.get() > 0 ) {
+				long queueSize = mQueue.size();
+
+				LOG.info("AsyncAuditProvider-stats:" + mName + ": past " + formatTimeForLog(intervalSinceLastLog)
+						+ ": inLogs=" + intervalInLogCount.get()
+						+ ", outLogs=" + intervalOutLogCount.get()
+						+ ", dropped=" + intervalDropCount.get()
+						+ ", currentQueueSize=" + queueSize);
+
+				LOG.info("AsyncAuditProvider-stats:" + mName + ": process lifetime"
+						+ ": inLogs=" + lifeTimeInLogCount.get()
+						+ ", outLogs=" + lifeTimeOutLogCount.get()
+						+ ", dropped=" + lifeTimeDropCount.get());
 			}
+
 			lastIntervalLogTime = System.currentTimeMillis();
-			intervalLogCount = 0;
-			intervalDropCount = 0;
+			intervalInLogCount.set(0);
+			intervalOutLogCount.set(0);
+			intervalDropCount.set(0);
 		}
 	}
 
 	private boolean isEmpty() {
-		synchronized (mQueue) {
-			return mQueue.isEmpty();
-		}
+		return mQueue.isEmpty();
 	}
 
 	private void waitToComplete(long maxWaitSeconds) {

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
index 87508aa..8a49259 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
@@ -49,7 +49,6 @@ public class AuditProviderFactory {
 	private static final String AUDIT_DB_IS_ENABLED_PROP            = "xasecure.audit.db.is.enabled"
;
 	private static final String AUDIT_DB_IS_ASYNC_PROP              = "xasecure.audit.db.is.async";
 	private static final String AUDIT_DB_MAX_QUEUE_SIZE_PROP        = "xasecure.audit.db.async.max.queue.size"
;
-	private static final String AUDIT_DB_RESUME_QUEUE_SIZE__PROP    = "xasecure.audit.db.async.resume.queue.size"
;
 	private static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP    = "xasecure.audit.db.async.max.flush.interval.ms";
 	private static final String AUDIT_DB_BATCH_SIZE_PROP            = "xasecure.audit.db.batch.size"
;
 	private static final String AUDIT_DB_RETRY_MIN_INTERVAL_PROP    = "xasecure.audit.db.config.retry.min.interval.ms";
@@ -61,14 +60,12 @@ public class AuditProviderFactory {
 	private static final String AUDIT_HDFS_IS_ENABLED_PROP          = "xasecure.audit.hdfs.is.enabled";
 	private static final String AUDIT_HDFS_IS_ASYNC_PROP            = "xasecure.audit.hdfs.is.async";
 	private static final String AUDIT_HDFS_MAX_QUEUE_SIZE_PROP      = "xasecure.audit.hdfs.async.max.queue.size"
;
-	private static final String AUDIT_HDFS_RESUME_QUEUE_SIZE__PROP  = "xasecure.audit.hdfs.async.resume.queue.size"
;
 	private static final String AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP  = "xasecure.audit.hdfs.async.max.flush.interval.ms";
 	private static final String AUDIT_HDFS_CONFIG_PREFIX_PROP       = "xasecure.audit.hdfs.config.";
 
 	private static final String AUDIT_LOG4J_IS_ENABLED_PROP         = "xasecure.audit.log4j.is.enabled"
;
 	private static final String AUDIT_LOG4J_IS_ASYNC_PROP           = "xasecure.audit.log4j.is.async";
 	private static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.log4j.async.max.queue.size"
;
-	private static final String AUDIT_LOG4J_RESUME_QUEUE_SIZE__PROP = "xasecure.audit.log4j.async.resume.queue.size"
;
 	private static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
 
 	private static AuditProviderFactory sFactory;
@@ -155,18 +152,10 @@ public class AuditProviderFactory {
 			DbAuditProvider dbProvider = new DbAuditProvider(jpaInitProperties, dbBatchSize, dbRetryMinIntervalMs);
 			
 			if(isAuditToDbAsync) {
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider();
-
 				int maxQueueSize     = getIntProperty(props, AUDIT_DB_MAX_QUEUE_SIZE_PROP, -1);
 				int maxFlushInterval = getIntProperty(props, AUDIT_DB_MAX_FLUSH_INTERVAL_PROP, -1);
-				int resumeQueueSize  = getIntProperty(props, AUDIT_DB_RESUME_QUEUE_SIZE__PROP, 0);
-
 
-				asyncProvider.setMaxQueueSize(maxQueueSize);
-				asyncProvider.setMaxFlushInterval(maxFlushInterval);
-				asyncProvider.setResumeQueueSize(resumeQueueSize);
-				
-				asyncProvider.addAuditProvider(dbProvider);
+				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("DbAuditProvider", maxQueueSize,
maxFlushInterval, dbProvider);
 				
 				providers.add(asyncProvider);
 			} else {
@@ -186,17 +175,10 @@ public class AuditProviderFactory {
 			boolean isAuditToHdfsAsync = getBooleanProperty(props, AUDIT_HDFS_IS_ASYNC_PROP, false);
 
 			if(isAuditToHdfsAsync) {
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider();
-
 				int maxQueueSize     = getIntProperty(props, AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, -1);
 				int maxFlushInterval = getIntProperty(props, AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, -1);
-				int resumeQueueSize  = getIntProperty(props, AUDIT_HDFS_RESUME_QUEUE_SIZE__PROP, 0);
 
-				asyncProvider.setMaxQueueSize(maxQueueSize);
-				asyncProvider.setMaxFlushInterval(maxFlushInterval);
-				asyncProvider.setResumeQueueSize(resumeQueueSize);
-				
-				asyncProvider.addAuditProvider(hdfsProvider);
+				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("HdfsAuditProvider", maxQueueSize,
maxFlushInterval, hdfsProvider);
 				
 				providers.add(asyncProvider);
 			} else {
@@ -210,17 +192,10 @@ public class AuditProviderFactory {
 			boolean isAuditToLog4jAsync = getBooleanProperty(props, AUDIT_LOG4J_IS_ASYNC_PROP, false);
 			
 			if(isAuditToLog4jAsync) {
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider();
-
 				int maxQueueSize     = getIntProperty(props, AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, -1);
 				int maxFlushInterval = getIntProperty(props, AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, -1);
-				int resumeQueueSize  = getIntProperty(props, AUDIT_LOG4J_RESUME_QUEUE_SIZE__PROP, 0);
 
-				asyncProvider.setMaxQueueSize(maxQueueSize);
-				asyncProvider.setMaxFlushInterval(maxFlushInterval);
-				asyncProvider.setResumeQueueSize(resumeQueueSize);
-				
-				asyncProvider.addAuditProvider(log4jProvider);
+				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("Log4jAuditProvider", maxQueueSize,
maxFlushInterval, log4jProvider);
 				
 				providers.add(asyncProvider);
 			} else {

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/agents-audit/src/main/java/com/xasecure/audit/provider/DebugTracer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/DebugTracer.java b/agents-audit/src/main/java/com/xasecure/audit/provider/DebugTracer.java
index 74d173d..6453708 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/DebugTracer.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/DebugTracer.java
@@ -19,6 +19,8 @@ package com.xasecure.audit.provider;
 public interface DebugTracer {
 	void debug(String msg);
 	void debug(String msg, Throwable excp);
+	void info(String msg);
+	void info(String msg, Throwable excp);
 	void warn(String msg);
 	void warn(String msg, Throwable excp);
 	void error(String msg);

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
index 9dc7327..0861dd1 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
@@ -193,7 +193,7 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T>
{
 				try {
 					writer.write(msg + MiscUtil.LINE_SEPARATOR);
 					
-					if(mFileBufferSizeBytes <= 0) {
+					if(mFileBufferSizeBytes == 0) {
 						writer.flush();
 					}
 	

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jTracer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jTracer.java b/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jTracer.java
index 0134375..734ad8d 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jTracer.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jTracer.java
@@ -33,6 +33,14 @@ public class Log4jTracer implements DebugTracer {
 		mLogger.debug(msg, excp);
 	}
 
+	public void info(String msg) {
+		mLogger.info(msg);
+	}
+
+	public void info(String msg, Throwable excp) {
+		mLogger.info(msg, excp);
+	}
+
 	public void warn(String msg) {
 		mLogger.warn(msg);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
index 5f69567..b3a103e 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
@@ -199,39 +199,37 @@ public class HdfsLogDestination<T> implements LogDestination<T>
{
 			try {
 				if(fileSystem.exists(pathLogfile)) { // file already exists. either append to the file
or write to a new file
 					if(mIsAppend) {
+						mLogger.info("HdfsLogDestination.openFile(): opening file for append " + mHdfsFilename);
+
 						ostream = fileSystem.append(pathLogfile);
 					} else {
-						mHdfsFilename =  getNewFilename(mHdfsFilename, fileSystem);
+						mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem);
 						pathLogfile   = new Path(mHdfsFilename);
 					}
 				}
 
 				// if file does not exist or if mIsAppend==false, create the file
 				if(ostream == null) {
+					mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename);
+
+					createParents(pathLogfile, fileSystem);
 					ostream = fileSystem.create(pathLogfile, bOverwrite);
 				}
 			} catch(IOException excp) {
-				// append may not be supported by the filesystem; or the file might already be open by
another application. Try a different filename - with current timestamp
-				mHdfsFilename =  getNewFilename(mHdfsFilename, fileSystem);
+				// append may not be supported by the filesystem; or the file might already be open by
another application. Try a different filename
+				String failedFilename = mHdfsFilename;
+
+				mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem);
 				pathLogfile   = new Path(mHdfsFilename);
+
+				mLogger.info("HdfsLogDestination.openFile(): failed in opening file " + failedFilename
+ ". Will try opening " + mHdfsFilename);
 			}
 
 			if(ostream == null){
-				ostream = fileSystem.create(pathLogfile, bOverwrite);
-			}
-		} catch(IOException ex) {
-			Path parentPath = pathLogfile.getParent();
+				mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename);
 
-			try {
-				if(parentPath != null&& fileSystem != null && !fileSystem.exists(parentPath)
&& fileSystem.mkdirs(parentPath)) {
-					ostream = fileSystem.create(pathLogfile, bOverwrite);
-				} else {
-					logException("HdfsLogDestination.openFile() failed", ex);
-				}
-			} catch (IOException e) {
-				logException("HdfsLogDestination.openFile() failed", e);
-			} catch (Throwable e) {
-				mLogger.warn("HdfsLogDestination.openFile() failed", e);
+				createParents(pathLogfile, fileSystem);
+				ostream = fileSystem.create(pathLogfile, bOverwrite);
 			}
 		} catch(Throwable ex) {
 			mLogger.warn("HdfsLogDestination.openFile() failed", ex);
@@ -266,6 +264,8 @@ public class HdfsLogDestination<T> implements LogDestination<T>
{
 
 		if(writer != null) {
 			try {
+				mLogger.info("HdfsLogDestination.closeFile(): closing file " + mHdfsFilename);
+
 				writer.flush();
 				writer.close();
 			} catch(IOException excp) {
@@ -325,6 +325,20 @@ public class HdfsLogDestination<T> implements LogDestination<T>
{
 
 	    return writer;
 	}
+	
+	private void createParents(Path pathLogfile, FileSystem fileSystem) {
+		try {
+			Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;
+
+			if(parentPath != null && fileSystem != null && !fileSystem.exists(parentPath))
{
+				 fileSystem.mkdirs(parentPath);
+			}
+		} catch (IOException e) {
+			logException("HdfsLogDestination.createParents() failed", e);
+		} catch (Throwable e) {
+			mLogger.warn("HdfsLogDestination.createParents() failed", e);
+		}
+	}
 
     private String getNewFilename(String fileName, FileSystem fileSystem) {
     	if(fileName == null) {

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/hbase-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hbase-agent/conf/xasecure-audit.xml b/hbase-agent/conf/xasecure-audit.xml
index ea9dfa5..5f14cda 100644
--- a/hbase-agent/conf/xasecure-audit.xml
+++ b/hbase-agent/conf/xasecure-audit.xml
@@ -81,7 +81,7 @@
 	
 	<property>
 		<name>xasecure.audit.db.is.enabled</name>
-		<value>true</value>
+		<value>false</value>
 	</property>	
 	
 	<property>
@@ -112,13 +112,28 @@
 	</property>	
 
 	<property>
+		<name>xasecure.audit.hdfs.is.async</name>
+		<value>true</value>
+	</property>	
+	
+	<property>
+		<name>xasecure.audit.hdfs.async.max.queue.size</name>
+		<value>1048576</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+		<value>30000</value>
+	</property>	
+
+	<property>
 		<name>xasecure.audit.hdfs.config.encoding</name>
 		<value></value>
 	</property>	
 
 	<property>
 		<name>xasecure.audit.hdfs.config.destination.directory</name>
-		<value>hdfs://%hostname%:8020/xasecure/audit/%app-type%/%time:yyyyMMdd%</value>
+		<value>hdfs://namenodehost:8020/argus/audit/%app-type%/%time:yyyyMMdd%</value>
 	</property>	
 
 	<property>

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/hdfs-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hdfs-agent/conf/xasecure-audit.xml b/hdfs-agent/conf/xasecure-audit.xml
index d940e2d..01346b8 100644
--- a/hdfs-agent/conf/xasecure-audit.xml
+++ b/hdfs-agent/conf/xasecure-audit.xml
@@ -45,11 +45,6 @@
 	</property>	
 
 	<property>
-		<name>xasecure.audit.db.async.resume.queue.size</name>
-		<value>8192</value>
-	</property>	
-
-	<property>
 		<name>xasecure.audit.db.async.max.flush.interval.ms</name>
 		<value>30000</value>
 	</property>	
@@ -89,8 +84,23 @@
 	<!-- HDFS audit provider configuration -->
 	<property>
 		<name>xasecure.audit.hdfs.is.enabled</name>
+		<value>false</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.is.async</name>
 		<value>true</value>
 	</property>	
+	
+	<property>
+		<name>xasecure.audit.hdfs.async.max.queue.size</name>
+		<value>1048576</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+		<value>30000</value>
+	</property>	
 
 	<property>
 		<name>xasecure.audit.hdfs.config.encoding</name>
@@ -99,7 +109,7 @@
 
 	<property>
 		<name>xasecure.audit.hdfs.config.destination.directory</name>
-		<value>hdfs://%hostname%:8020/xasecure/audit/%app-type%/%time:yyyyMMdd%</value>
+		<value>hdfs://namenodehost:8020/argus/audit/%app-type%/%time:yyyyMMdd%</value>
 	</property>	
 
 	<property>
@@ -174,11 +184,6 @@
 	</property>	
 
 	<property>
-		<name>xasecure.audit.log4j.async.resume.queue.size</name>
-		<value>8192</value>
-	</property>	
-
-	<property>
 		<name>xasecure.audit.log4j.async.max.flush.interval.ms</name>
 		<value>30000</value>
 	</property>	

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/hive-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hive-agent/conf/xasecure-audit.xml b/hive-agent/conf/xasecure-audit.xml
index 0b0d87e..4c2cf53 100644
--- a/hive-agent/conf/xasecure-audit.xml
+++ b/hive-agent/conf/xasecure-audit.xml
@@ -108,8 +108,23 @@
 	<!-- HDFS audit provider configuration -->
 	<property>
 		<name>xasecure.audit.hdfs.is.enabled</name>
+		<value>false</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.is.async</name>
 		<value>true</value>
 	</property>	
+	
+	<property>
+		<name>xasecure.audit.hdfs.async.max.queue.size</name>
+		<value>1048576</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+		<value>30000</value>
+	</property>	
 
 	<property>
 		<name>xasecure.audit.hdfs.config.encoding</name>
@@ -118,7 +133,7 @@
 
 	<property>
 		<name>xasecure.audit.hdfs.config.destination.directory</name>
-		<value>hdfs://%hostname%:8020/xasecure/audit/%app-type%/%time:yyyyMMdd%</value>
+		<value>hdfs://namenodehost:8020/argus/audit/%app-type%/%time:yyyyMMdd%</value>
 	</property>	
 
 	<property>

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/knox-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/knox-agent/conf/xasecure-audit.xml b/knox-agent/conf/xasecure-audit.xml
index 21ba2c3..36b6704 100644
--- a/knox-agent/conf/xasecure-audit.xml
+++ b/knox-agent/conf/xasecure-audit.xml
@@ -103,8 +103,23 @@
 	<!-- HDFS audit provider configuration -->
 	<property>
 		<name>xasecure.audit.hdfs.is.enabled</name>
+		<value>false</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.is.async</name>
 		<value>true</value>
 	</property>	
+	
+	<property>
+		<name>xasecure.audit.hdfs.async.max.queue.size</name>
+		<value>1048576</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+		<value>30000</value>
+	</property>	
 
 	<property>
 		<name>xasecure.audit.hdfs.config.encoding</name>
@@ -113,7 +128,7 @@
 
 	<property>
 		<name>xasecure.audit.hdfs.config.destination.directory</name>
-		<value>hdfs://%hostname%:8020/xasecure/audit/%app-type%/%time:yyyyMMdd%</value>
+		<value>hdfs://namenodehost:8020/argus/audit/%app-type%/%time:yyyyMMdd%</value>
 	</property>	
 
 	<property>

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/2c3926b5/storm-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/storm-agent/conf/xasecure-audit.xml b/storm-agent/conf/xasecure-audit.xml
index 10d8237..c9c57c8 100644
--- a/storm-agent/conf/xasecure-audit.xml
+++ b/storm-agent/conf/xasecure-audit.xml
@@ -108,8 +108,23 @@
 	<!-- HDFS audit provider configuration -->
 	<property>
 		<name>xasecure.audit.hdfs.is.enabled</name>
+		<value>false</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.is.async</name>
 		<value>true</value>
 	</property>	
+	
+	<property>
+		<name>xasecure.audit.hdfs.async.max.queue.size</name>
+		<value>1048576</value>
+	</property>	
+
+	<property>
+		<name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+		<value>30000</value>
+	</property>	
 
 	<property>
 		<name>xasecure.audit.hdfs.config.encoding</name>
@@ -118,7 +133,7 @@
 
 	<property>
 		<name>xasecure.audit.hdfs.config.destination.directory</name>
-		<value>hdfs://%hostname%:8020/xasecure/audit/%app-type%/%time:yyyyMMdd%</value>
+		<value>hdfs://namenodehost:8020/argus/audit/%app-type%/%time:yyyyMMdd%</value>
 	</property>	
 
 	<property>


Mime
View raw message