ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [1/4] git commit: - ARGUS-101: update per review comment - added HDFS audit configuration to specify the size of the local file buffer - updated stage-directory scanner to include only files (and not directories) for transfer to HDFS - updated HDFS file
Date Sun, 12 Oct 2014 21:25:11 GMT
Repository: incubator-argus
Updated Branches:
  refs/heads/master 6beed95f5 -> 72d5a91bf


- ARGUS-101: update per review comment
- added HDFS audit configuration to specify the size of the local file
buffer
- updated stage-directory scanner to include only files (and not
directories) for transfer to HDFS
- updated HDFS file rename logic to handle failure to open file for
write/append


Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/cd863f20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/cd863f20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/cd863f20

Branch: refs/heads/master
Commit: cd863f20d8f2c7c43f28adc720fbc0498ba4bc28
Parents: 38ed203
Author: mneethiraj <mneethiraj@hortonworks.com>
Authored: Sat Oct 11 01:06:07 2014 -0700
Committer: mneethiraj <mneethiraj@hortonworks.com>
Committed: Sat Oct 11 01:06:07 2014 -0700

----------------------------------------------------------------------
 .../audit/provider/LocalFileLogBuffer.java      | 56 ++++++++++++-------
 .../audit/provider/hdfs/HdfsAuditProvider.java  |  2 +
 .../audit/provider/hdfs/HdfsLogDestination.java | 57 ++++++++------------
 hbase-agent/conf/xasecure-audit.xml             |  5 ++
 hdfs-agent/conf/xasecure-audit.xml              |  5 ++
 hive-agent/conf/xasecure-audit.xml              |  5 ++
 knox-agent/conf/xasecure-audit.xml              |  5 ++
 storm-agent/conf/xasecure-audit.xml             |  5 ++
 8 files changed, 85 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
index b04b6a4..0d1c42e 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
@@ -19,6 +19,7 @@
 package com.xasecure.audit.provider;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
@@ -43,6 +44,7 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
 	private String  mDirectory               = null;
 	private String  mFile                    = null;
 	private int     mFlushIntervalSeconds    = 1 * 60;
+	private int     mFileBufferSizeBytes     = 8 * 1024;
 	private String  mEncoding                = null;
 	private boolean mIsAppend                = true;
 	private int     mRolloverIntervalSeconds = 10 * 60;
@@ -77,6 +79,14 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T>
{
 		mFile = file;
 	}
 
+	public int getFileBufferSizeBytes() {
+		return mFileBufferSizeBytes;
+	}
+
+	public void setFileBufferSizeBytes(int fileBufferSizeBytes) {
+		mFileBufferSizeBytes = fileBufferSizeBytes;
+	}
+
 	public int getFlushIntervalSeconds() {
 		return mFlushIntervalSeconds;
 	}
@@ -165,29 +175,31 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T>
{
 	}
 
 	@Override
-	public synchronized boolean add(T log) {
+	public boolean add(T log) {
 		boolean ret = false;
 
-		checkFileStatus();
-
-		Writer writer = mWriter;
+		String msg = MiscUtil.stringify(log);
 
-		if(writer != null) {
-			try {
-				String msg = MiscUtil.stringify(log);
+		if(msg.contains(MiscUtil.LINE_SEPARATOR)) {
+			msg = msg.replace(MiscUtil.LINE_SEPARATOR, MiscUtil.ESCAPE_STR + MiscUtil.LINE_SEPARATOR);
+		}
 
-				if(msg.contains(MiscUtil.LINE_SEPARATOR)) {
-					msg = msg.replace(MiscUtil.LINE_SEPARATOR, MiscUtil.ESCAPE_STR + MiscUtil.LINE_SEPARATOR);
+		synchronized(this) {
+			checkFileStatus();
+	
+			Writer writer = mWriter;
+	
+			if(writer != null) {
+				try {
+					writer.write(msg + MiscUtil.LINE_SEPARATOR);
+	
+					ret = true;
+				} catch(IOException excp) {
+					mLogger.warn("LocalFileLogBuffer.add(): write failed", excp);
 				}
-
-				writer.write(msg + MiscUtil.LINE_SEPARATOR);
-
-				ret = true;
-			} catch(IOException excp) {
-				mLogger.warn("LocalFileLogBuffer.add(): write failed", excp);
+			} else {
+				mLogger.warn("LocalFileLogBuffer.add(): writer is null");
 			}
-		} else {
-			mLogger.warn("LocalFileLogBuffer.add(): writer is null");
 		}
 
 		return ret;
@@ -288,8 +300,8 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T>
{
 		}
 	}
 
-	private OutputStreamWriter createWriter(OutputStream os ) {
-	    OutputStreamWriter writer = null;
+	private Writer createWriter(OutputStream os ) {
+	    Writer writer = null;
 
 	    if(os != null) {
 			if(mEncoding != null) {
@@ -303,6 +315,10 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T>
{
 			if(writer == null) {
 				writer = new OutputStreamWriter(os);
 			}
+
+			if(mFileBufferSizeBytes > 0 && writer != null) {
+	    		writer = new BufferedWriter(writer, mFileBufferSizeBytes);
+	    	}
 	    }
 
 	    return writer;
@@ -440,7 +456,7 @@ class DestinationDispatcherThread<T> extends Thread {
 		
 				if(files != null) {
 					for(File file : files) {
-						if(file.exists() && file.canRead()) {
+						if(file.exists() && file.isFile() && file.canRead()) {
 							String filename = file.getAbsolutePath();
 							if(! mFileLogBuffer.isCurrentFilename(filename)) {
 								addLogfile(filename);

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
index db94313..1d3e2c7 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
@@ -30,6 +30,7 @@ public class HdfsAuditProvider extends BufferedAuditProvider {
 		String localFileBufferDirectory               = properties.get("local.buffer.directory");
 		String localFileBufferFile                    = properties.get("local.buffer.file");
 		int    localFileBufferFlushIntervalSeconds    = MiscUtil.parseInteger(properties.get("local.buffer.flush.interval.seconds"),
1 * 60);
+		int    localFileBufferFileBufferSizeBytes     = MiscUtil.parseInteger(properties.get("local.buffer.file.buffer.size.bytes"),
8 * 1024);
 		int    localFileBufferRolloverIntervalSeconds = MiscUtil.parseInteger(properties.get("local.buffer.rollover.interval.seconds"),
10 * 60);
 		String localFileBufferArchiveDirectory        = properties.get("local.archive.directory");
 		int    localFileBufferArchiveFileCount        = MiscUtil.parseInteger(properties.get("local.archive.max.file.count"),
10);
@@ -50,6 +51,7 @@ public class HdfsAuditProvider extends BufferedAuditProvider {
 		mLocalFileBuffer.setDirectory(localFileBufferDirectory);
 		mLocalFileBuffer.setFile(localFileBufferFile);
 		mLocalFileBuffer.setFlushIntervalSeconds(localFileBufferFlushIntervalSeconds);
+		mLocalFileBuffer.setFileBufferSizeBytes(localFileBufferFileBufferSizeBytes);
 		mLocalFileBuffer.setEncoding(encoding);
 		mLocalFileBuffer.setRolloverIntervalSeconds(localFileBufferRolloverIntervalSeconds);
 		mLocalFileBuffer.setArchiveDirectory(localFileBufferArchiveDirectory);

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
index 2939a5c..f31e4cb 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
@@ -35,11 +35,13 @@ import com.xasecure.audit.provider.LogDestination;
 import com.xasecure.audit.provider.MiscUtil;
 
 public class HdfsLogDestination<T> implements LogDestination<T> {
+	public final static String EXCP_MSG_FILESYSTEM_CLOSED = "Filesystem closed";
+
 	private String  mDirectory                = null;
 	private String  mFile                     = null;
 	private int     mFlushIntervalSeconds     = 1 * 60;
 	private String  mEncoding                 = null;
-	private boolean mIsAppend                 = true;
+	private boolean mIsAppend                 = false;
 	private int     mRolloverIntervalSeconds  = 24 * 60 * 60;
 	private int     mOpenRetryIntervalSeconds = 60;
 	private DebugTracer mLogger               = null;
@@ -193,19 +195,24 @@ public class HdfsLogDestination<T> implements LogDestination<T>
{
 			pathLogfile = new Path(mHdfsFilename);
 			fileSystem  = FileSystem.get(uri, conf);
 
-			if(fileSystem.exists(pathLogfile)) {
-				if(mIsAppend) {
-					try {
+			try {
+				if(fileSystem.exists(pathLogfile)) { // file already exists. either append to the file
or write to a new file
+					if(mIsAppend) {
 						ostream = fileSystem.append(pathLogfile);
-					} catch(IOException excp) {
-						// append may not be supported by the filesystem. rename existing file and create a
new one
-						String fileSuffix    = MiscUtil.replaceTokens("-" + MiscUtil.TOKEN_START + MiscUtil.TOKEN_TIME
+ "yyyyMMdd-HHmm.ss" + MiscUtil.TOKEN_END, startTime);
-						String movedFilename = appendToFilename(mHdfsFilename, fileSuffix);
-						Path   movedFilePath = new Path(movedFilename);
-
-						fileSystem.rename(pathLogfile, movedFilePath);
+					} else {
+						mHdfsFilename =  MiscUtil.replaceTokens(mDirectory + org.apache.hadoop.fs.Path.SEPARATOR
+ mFile, System.currentTimeMillis());
+						pathLogfile   = new Path(mHdfsFilename);
 					}
 				}
+
+				// if file does not exist or if mIsAppend==false, create the file
+				if(ostream == null) {
+					ostream = fileSystem.create(pathLogfile);
+				}
+			} catch(IOException excp) {
+				// append may not be supported by the filesystem; or the file might already be open by
another application. Try a different filename - with current timestamp
+				mHdfsFilename =  MiscUtil.replaceTokens(mDirectory + org.apache.hadoop.fs.Path.SEPARATOR
+ mFile, System.currentTimeMillis());
+				pathLogfile   = new Path(mHdfsFilename);
 			}
 
 			if(ostream == null){
@@ -318,34 +325,14 @@ public class HdfsLogDestination<T> implements LogDestination<T>
{
 	    return writer;
 	}
 	
-	private String appendToFilename(String fileName, String strToAppend) {
-		String ret = fileName;
-		
-		if(strToAppend != null) {
-			if(ret == null) {
-				ret = "";
-			}
-	
-			int extnPos = ret.lastIndexOf(".");
-			
-			if(extnPos < 0) {
-				ret += strToAppend;
-			} else {
-				String extn = ret.substring(extnPos);
-				
-				ret = ret.substring(0, extnPos) + strToAppend + extn;
-			}
-		}
-
-		return ret;
-	}
-	
 	private void logException(String msg, IOException excp) {
-		if(mIsStopInProgress) { // during shutdown, the underlying FileSystem might already be
closed; so don't print error details
+		// during shutdown, the underlying FileSystem might already be closed; so don't print error
details
+
+		if(mIsStopInProgress) {
 			return;
 		}
 
-		String  excpMsgToExclude   = "Filesystem closed";
+		String  excpMsgToExclude   = EXCP_MSG_FILESYSTEM_CLOSED;;
 		String  excpMsg            = excp != null ? excp.getMessage() : null;
 		boolean excpExcludeLogging = (excpMsg != null && excpMsg.contains(excpMsgToExclude));
 		

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/hbase-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hbase-agent/conf/xasecure-audit.xml b/hbase-agent/conf/xasecure-audit.xml
index d9bcf14..35a569e 100644
--- a/hbase-agent/conf/xasecure-audit.xml
+++ b/hbase-agent/conf/xasecure-audit.xml
@@ -136,6 +136,11 @@
 	</property>	
 
 	<property>
+		<name>xasecure.audit.hdfs.config.local.buffer.file.buffer.size.bytes</name>
+		<value>8192</value>
+	</property>	
+
+	<property>
 		<name>xasecure.audit.hdfs.config.local.buffer.flush.interval.seconds</name>
 		<value>60</value>
 	</property>	

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/hdfs-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hdfs-agent/conf/xasecure-audit.xml b/hdfs-agent/conf/xasecure-audit.xml
index 9182913..fe0aad4 100644
--- a/hdfs-agent/conf/xasecure-audit.xml
+++ b/hdfs-agent/conf/xasecure-audit.xml
@@ -117,6 +117,11 @@
 	</property>	
 
 	<property>
+		<name>xasecure.audit.hdfs.config.local.buffer.file.buffer.size.bytes</name>
+		<value>8192</value>
+	</property>	
+
+	<property>
 		<name>xasecure.audit.hdfs.config.local.buffer.flush.interval.seconds</name>
 		<value>60</value>
 	</property>	

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/hive-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hive-agent/conf/xasecure-audit.xml b/hive-agent/conf/xasecure-audit.xml
index b141429..c43aafc 100644
--- a/hive-agent/conf/xasecure-audit.xml
+++ b/hive-agent/conf/xasecure-audit.xml
@@ -136,6 +136,11 @@
 	</property>	
 
 	<property>
+		<name>xasecure.audit.hdfs.config.local.buffer.file.buffer.size.bytes</name>
+		<value>8192</value>
+	</property>	
+
+	<property>
 		<name>xasecure.audit.hdfs.config.local.buffer.flush.interval.seconds</name>
 		<value>60</value>
 	</property>	

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/knox-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/knox-agent/conf/xasecure-audit.xml b/knox-agent/conf/xasecure-audit.xml
index b50ffe6..ab75664 100644
--- a/knox-agent/conf/xasecure-audit.xml
+++ b/knox-agent/conf/xasecure-audit.xml
@@ -131,6 +131,11 @@
 	</property>	
 
 	<property>
+		<name>xasecure.audit.hdfs.config.local.buffer.file.buffer.size.bytes</name>
+		<value>8192</value>
+	</property>	
+
+	<property>
 		<name>xasecure.audit.hdfs.config.local.buffer.flush.interval.seconds</name>
 		<value>60</value>
 	</property>	

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/cd863f20/storm-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/storm-agent/conf/xasecure-audit.xml b/storm-agent/conf/xasecure-audit.xml
index d868dda..4ac45c0 100644
--- a/storm-agent/conf/xasecure-audit.xml
+++ b/storm-agent/conf/xasecure-audit.xml
@@ -136,6 +136,11 @@
 	</property>	
 
 	<property>
+		<name>xasecure.audit.hdfs.config.local.buffer.file.buffer.size.bytes</name>
+		<value>8192</value>
+	</property>	
+
+	<property>
 		<name>xasecure.audit.hdfs.config.local.buffer.flush.interval.seconds</name>
 		<value>60</value>
 	</property>	


Mime
View raw message