ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [01/10] git commit: Log4j appender to write to HDFS.
Date Mon, 22 Sep 2014 18:29:15 GMT
Repository: incubator-argus
Updated Branches:
  refs/heads/master 1e1fcff18 -> cfec85a65


Log4j appender to write to HDFS.

Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/be47e640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/be47e640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/be47e640

Branch: refs/heads/master
Commit: be47e6402804b013cfcf76aae27f899010b721ed
Parents: daac1c6
Author: mneethiraj <mneethiraj@hortonworks.com>
Authored: Thu Sep 18 23:40:57 2014 -0700
Committer: mneethiraj <mneethiraj@hortonworks.com>
Committed: Thu Sep 18 23:40:57 2014 -0700

----------------------------------------------------------------------
 .../audit/provider/Log4jAuditProvider.java      |   2 +-
 .../audit/provider/MultiDestAuditProvider.java  |  38 +-
 agents-common/conf/log4j.properties             |  18 +
 .../java/org/apache/log4j/BufferedAppender.java | 123 ++++
 .../org/apache/log4j/HdfsLogDestination.java    | 297 ++++++++++
 .../apache/log4j/HdfsRollingFileAppender.java   | 130 ++++
 .../org/apache/log4j/LocalFileLogBuffer.java    | 591 +++++++++++++++++++
 .../main/java/org/apache/log4j/LogBuffer.java   |  30 +
 .../java/org/apache/log4j/LogDestination.java   |  30 +
 9 files changed, 1245 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
index e9c7bd4..dc6a27f 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/Log4jAuditProvider.java
@@ -27,7 +27,7 @@ import com.xasecure.audit.model.AuditEventBase;
 public class Log4jAuditProvider implements AuditProvider {
 
 	private static final Log LOG      = LogFactory.getLog(Log4jAuditProvider.class);
-	private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class);
+	private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class.getName());
 
 
 	public Log4jAuditProvider() {

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
b/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
index d068bdd..cee9f5a 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/MultiDestAuditProvider.java
@@ -67,33 +67,45 @@ public class MultiDestAuditProvider implements AuditProvider {
 
 	@Override
 	public void log(AuditEventBase event) {
-		try {
-            for(AuditProvider provider : mProviders) {
+        for(AuditProvider provider : mProviders) {
+    		try {
                 provider.log(event);
-            }
-		} catch(Throwable excp) {
-			LOG.error("failed to log event { " + event.toString() + " }", excp);
-		}
+    		} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.log(): failed for provider { " + provider.getClass().getName()
+ " }", excp);
+    		}
+        }
 	}
 
 	@Override
 	public void start() {
 		for(AuditProvider provider : mProviders) {
-			provider.start();
+    		try {
+				provider.start();
+    		} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.start(): failed for provider { " + provider.getClass().getName()
+ " }", excp);
+    		}
 		}
 	}
 
 	@Override
 	public void stop() {
 		for(AuditProvider provider : mProviders) {
-			provider.stop();
+			try {
+				provider.stop();
+			} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.stop(): failed for provider { " + provider.getClass().getName()
+ " }", excp);
+			}
 		}
 	}
 
 	@Override
     public void waitToComplete() {
 		for(AuditProvider provider : mProviders) {
-			provider.waitToComplete();
+			try {
+				provider.waitToComplete();
+			} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName()
+ " }", excp);
+			}
 		}
 	}
 	
@@ -126,12 +138,12 @@ public class MultiDestAuditProvider implements AuditProvider {
 	
 	@Override
 	public void flush() {
-		try {
-			for(AuditProvider provider : mProviders) {
+		for(AuditProvider provider : mProviders) {
+			try {
 				provider.flush();
+			} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.flush(): failed for provider { " + provider.getClass().getName()
+ " }", excp);
 			}
-		} catch(Throwable excp) {
-			LOG.error("AsyncAuditProvider.flush(): failed to flush events", excp);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-common/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/agents-common/conf/log4j.properties b/agents-common/conf/log4j.properties
new file mode 100644
index 0000000..c1ea266
--- /dev/null
+++ b/agents-common/conf/log4j.properties
@@ -0,0 +1,18 @@
+log4j.logger.xaaudit.com.xasecure.audit.provider.Log4jAuditProvider=INFO, hdfsAppender
+
+
+log4j.appender.hdfsAppender=org.apache.log4j.HdfsRollingFileAppender
+log4j.appender.hdfsAppender.hdfsDestinationDirectory=hdfs://%hostname%:8020/logs/application/%file-open-time:yyyyMMdd%
+log4j.appender.hdfsAppender.hdfsDestinationFile=%hostname%-audit.log
+log4j.appender.hdfsAppender.hdfsDestinationRolloverIntervalSeconds=86400
+
+log4j.appender.hdfsAppender.localFileBufferDirectory=/tmp/logs/application/%hostname%
+log4j.appender.hdfsAppender.localFileBufferFile=%file-open-time:yyyyMMdd-HHmm.ss%.log
+log4j.appender.hdfsAppender.localFileBufferRolloverIntervalSeconds=15
+log4j.appender.hdfsAppender.localFileBufferArchiveDirectory=/tmp/logs/archive/application/%hostname%
+log4j.appender.hdfsAppender.localFileBufferArchiveFileCount=12
+
+
+log4j.appender.hdfsAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.hdfsAppender.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}:
%m%n
+log4j.appender.hdfsAppender.encoding=UTF-8

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-common/src/main/java/org/apache/log4j/BufferedAppender.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/log4j/BufferedAppender.java b/agents-common/src/main/java/org/apache/log4j/BufferedAppender.java
new file mode 100644
index 0000000..916d2eb
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/log4j/BufferedAppender.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.log4j;
+
+
+import org.apache.log4j.helpers.LogLog;
+
+public abstract class BufferedAppender<T> extends AppenderSkeleton {
+	private LogDestination<T> mLogDestination = null;
+	private LogBuffer<T>      mLogBuffer      = null;
+
+	protected void setBufferAndDestination(LogBuffer<T> buffer, LogDestination<T>
destination) {
+		close();
+
+		mLogBuffer      = buffer;
+		mLogDestination = destination;
+	}
+
+	protected boolean isLogable() {
+		return mLogBuffer != null;
+	}
+
+	protected void addToBuffer(T log) {
+		if(mLogBuffer != null) {
+			mLogBuffer.add(log);
+		}
+	}
+
+	protected void start() {
+		LogLog.debug("==> BufferedAppender.start()");
+
+		if(mLogBuffer == null) {
+			LogLog.warn("BufferedAppender.start(): logBuffer is null");
+		}
+
+		if(mLogDestination == null) {
+			LogLog.warn("BufferedAppender.start(): logDestination is null");
+		}
+
+		if(mLogBuffer != null && mLogDestination != null) {
+			JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(this);
+
+		    Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+
+			mLogBuffer.start(mLogDestination);
+		}
+
+		LogLog.debug("<== BufferedAppender.start()");
+	}
+
+	protected void stop() {
+		LogLog.debug("==> BufferedAppender.stop()");
+
+		LogBuffer<T> tmpBuff = mLogBuffer;
+
+		mLogDestination = null;
+		mLogBuffer      = null;
+
+		if(tmpBuff != null) {
+			tmpBuff.stop();
+		}
+
+		LogLog.debug("<== BufferedAppender.stop()");
+	}
+
+	@Override
+	public void close() {
+		LogLog.debug("==> BufferedAppender.close()");
+
+		stop();
+
+		LogLog.debug("<== BufferedAppender.close()");
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+
+		sb.append("BufferedAppender {");
+		if(mLogDestination != null) {
+			sb.append(mLogDestination.toString());
+		}
+
+		sb.append(" ");
+
+		if(mLogBuffer != null) {
+			sb.append(mLogBuffer.toString());
+		}
+		sb.append("}");
+
+		return sb.toString();
+	}
+
+	private class JVMShutdownHook extends Thread {
+		Appender mAppender = null;
+
+		public JVMShutdownHook(Appender appender) {
+			mAppender = appender;
+		}
+
+		public void run() {
+			if(mAppender != null) {
+				mAppender.close();
+			}
+	    }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-common/src/main/java/org/apache/log4j/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/log4j/HdfsLogDestination.java b/agents-common/src/main/java/org/apache/log4j/HdfsLogDestination.java
new file mode 100644
index 0000000..ec53977
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/log4j/HdfsLogDestination.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.log4j;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LocalFileLogBuffer.MiscUtil;
+import org.apache.log4j.helpers.LogLog;
+
+public class HdfsLogDestination implements LogDestination<String> {
+	private String  mDirectory               = null;
+	private String  mFile                    = null;
+	private String  mEncoding                = null;
+	private boolean mIsAppend                = true;
+	private int     mRolloverIntervalSeconds = 24 * 60 * 60;
+
+	private OutputStreamWriter mWriter           = null; 
+	private String             mCurrentFilename  = null;
+	private long               mNextRolloverTime = 0;
+	
+	private String LINE_SEPARATOR = System.getProperty("line.separator");
+
+	public HdfsLogDestination() {
+	}
+
+	public String getDirectory() {
+		return mDirectory;
+	}
+
+	public void setDirectory(String directory) {
+		this.mDirectory = directory;
+	}
+
+	public String getFile() {
+		return mFile;
+	}
+
+	public void setFile(String file) {
+		this.mFile = file;
+	}
+
+	public String getEncoding() {
+		return mEncoding;
+	}
+
+	public void setEncoding(String encoding) {
+		mEncoding = encoding;
+	}
+
+	public int getRolloverIntervalSeconds() {
+		return mRolloverIntervalSeconds;
+	}
+
+	public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+		this.mRolloverIntervalSeconds = rolloverIntervalSeconds;
+	}
+
+	@Override
+	public void start() {
+		LogLog.debug("==> HdfsLogDestination.start()");
+
+		openFile();
+
+		LogLog.debug("<== HdfsLogDestination.start()");
+	}
+
+	@Override
+	public void stop() {
+		LogLog.debug("==> HdfsLogDestination.stop()");
+
+		closeFile();
+
+		LogLog.debug("<== HdfsLogDestination.stop()");
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return mWriter != null;
+	}
+
+	@Override
+	public boolean add(String log) {
+		boolean ret = false;
+
+		long now = System.currentTimeMillis();
+
+		if(now > mNextRolloverTime) {
+			rollover();
+		}
+
+		OutputStreamWriter writer = mWriter;
+
+		if(writer != null) {
+			try {
+				writer.write(log + LINE_SEPARATOR);
+				writer.flush();
+
+				ret = true;
+			} catch (IOException excp) {
+				LogLog.warn("HdfsLogDestination.add(): write failed", excp);
+			}
+		}
+
+		return ret;
+	}
+
+	private void openFile() {
+		LogLog.debug("==> HdfsLogDestination.openFile()");
+
+		closeFile();
+
+		mCurrentFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
+
+		FSDataOutputStream ostream     = null;
+		FileSystem         fileSystem  = null;
+		Path               pathLogfile = null;
+		Configuration      conf        = null;
+
+		try {
+			LogLog.debug("HdfsLogDestination.openFile(): opening file " + mCurrentFilename);
+
+			URI uri = URI.create(mCurrentFilename);
+
+			// TODO: mechanism to notify co-located XA-HDFS plugin to disable auditing of access checks
to the current HDFS file
+			//       this can be driven by adding an option (property) the logger, which can be configured
at the deployment time.
+			//       Like: hdfsCurrentFilenameProperty. When this option is set, do the following
here:
+			//        System.setProperty(hdfsCurrentFilenameProperty, uri.getPath());
+
+			conf        = new Configuration();
+			pathLogfile = new Path(mCurrentFilename);
+			fileSystem  = FileSystem.get(uri, conf);
+
+			if(fileSystem.exists(pathLogfile)) {
+				if(mIsAppend) {
+					try {
+						ostream = fileSystem.append(pathLogfile);
+					} catch(IOException excp) {
+						// append may not be supported by the filesystem. rename existing file and create a
new one
+						String fileSuffix    = MiscUtil.replaceTokens("-" + MiscUtil.TOKEN_FILE_OPEN_TIME_START
+ "yyyyMMdd-HHmm.ss" + MiscUtil.TOKEN_FILE_OPEN_TIME_END);
+						String movedFilename = appendToFilename(mCurrentFilename, fileSuffix);
+						Path   movedFilePath = new Path(movedFilename);
+
+						fileSystem.rename(pathLogfile, movedFilePath);
+					}
+				}
+			}
+
+			if(ostream == null){
+				ostream = fileSystem.create(pathLogfile);
+			}
+		} catch(IOException ex) {
+			Path parentPath = pathLogfile.getParent();
+
+			try {
+				if(parentPath != null&& fileSystem != null && !fileSystem.exists(parentPath)
&& fileSystem.mkdirs(parentPath)) {
+					ostream = fileSystem.create(pathLogfile);
+				}
+			} catch (IOException e) {
+				LogLog.warn("HdfsLogDestination.openFile() failed", e);
+			} catch (Throwable e) {
+				LogLog.warn("HdfsLogDestination.openFile() failed", e);
+			}
+		} catch(Throwable ex) {
+			LogLog.warn("HdfsLogDestination.openFile() failed", ex);
+		} finally {
+			// TODO: unset the property set above to exclude auditing of logfile opening
+			//        System.setProperty(hdfsCurrentFilenameProperty, null);
+		}
+
+		mWriter = createWriter(ostream);
+
+		if(mWriter != null) {
+			LogLog.debug("HdfsLogDestination.openFile(): opened file " + mCurrentFilename);
+
+			updateNextRolloverTime();
+		} else {
+			LogLog.warn("HdfsLogDestination.openFile(): failed to open file for write " + mCurrentFilename);
+
+			mCurrentFilename = null;
+		}
+
+		LogLog.debug("<== HdfsLogDestination.openFile(" + mCurrentFilename + ")");
+	}
+
+	private void closeFile() {
+		LogLog.debug("==> HdfsLogDestination.closeFile()");
+
+		OutputStreamWriter writer = mWriter;
+
+		mWriter = null;
+
+		if(writer != null) {
+			try {
+				writer.close();
+			} catch(IOException excp) {
+				LogLog.warn("HdfsLogDestination: failed to close file " + mCurrentFilename, excp);
+			}
+		}
+
+		LogLog.debug("<== HdfsLogDestination.closeFile()");
+	}
+
+	private void rollover() {
+		LogLog.debug("==> HdfsLogDestination.rollover()");
+
+		closeFile();
+
+		openFile();
+
+		LogLog.debug("<== HdfsLogDestination.rollover()");
+	}
+
+	private OutputStreamWriter createWriter(OutputStream os ) {
+	    OutputStreamWriter writer = null;
+
+	    if(os != null) {
+			if(mEncoding != null) {
+				try {
+					writer = new OutputStreamWriter(os, mEncoding);
+				} catch(UnsupportedEncodingException excp) {
+					LogLog.warn("LocalFileLogBuffer: failed to create output writer.", excp);
+				}
+			}
+	
+			if(writer == null) {
+				writer = new OutputStreamWriter(os);
+			}
+	    }
+
+	    return writer;
+	}
+
+	private void updateNextRolloverTime() {
+		mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds
* 1000));
+	}
+	
+	private String appendToFilename(String fileName, String strToAppend) {
+		String ret = fileName;
+		
+		if(strToAppend != null) {
+			if(ret == null) {
+				ret = "";
+			}
+	
+			int extnPos = ret.lastIndexOf(".");
+			
+			if(extnPos < 0) {
+				ret += strToAppend;
+			} else {
+				String extn = ret.substring(extnPos);
+				
+				ret = ret.substring(0, extnPos) + strToAppend + extn;
+			}
+		}
+
+		return ret;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+
+		sb.append("HdfsLogDestination {");
+		sb.append("Directory=").append(mDirectory).append("; ");
+		sb.append("File=").append(mFile).append("; ");
+		sb.append("RolloverIntervalSeconds=").append(mRolloverIntervalSeconds);
+		sb.append("}");
+		
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-common/src/main/java/org/apache/log4j/HdfsRollingFileAppender.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/log4j/HdfsRollingFileAppender.java b/agents-common/src/main/java/org/apache/log4j/HdfsRollingFileAppender.java
new file mode 100644
index 0000000..a855575
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/log4j/HdfsRollingFileAppender.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.log4j;
+
+
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+
+public class HdfsRollingFileAppender extends BufferedAppender<String> {
+	String mEncoding                               = null;
+
+	String mHdfsDestinationDirectory               = null;
+	String mHdfsDestinationFile                    = null;
+	int    mHdfsDestinationRolloverIntervalSeconds = 24 * 60 * 60;
+
+	String mLocalFileBufferDirectory               = null;
+	String mLocalFileBufferFile                    = null;
+	int    mLocalFileBufferRolloverIntervalSeconds = 10 * 60;
+	String mLocalFileBufferArchiveDirectory        = null;
+	int    mLocalFileBufferArchiveFileCount        = 10;
+
+	public void setEncoding(String encoding) {
+		mEncoding = encoding;
+	}
+
+	public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) {
+		mHdfsDestinationDirectory = hdfsDestinationDirectory;
+	}
+
+	public void setHdfsDestinationFile(String hdfsDestinationFile) {
+		mHdfsDestinationFile = hdfsDestinationFile;
+	}
+
+	public void setHdfsDestinationRolloverIntervalSeconds(int hdfsDestinationRolloverIntervalSeconds)
{
+		mHdfsDestinationRolloverIntervalSeconds = hdfsDestinationRolloverIntervalSeconds;
+	}
+
+	public void setLocalFileBufferDirectory(String localFileBufferDirectory) {
+		mLocalFileBufferDirectory = localFileBufferDirectory;
+	}
+
+	public void setLocalFileBufferFile(String localFileBufferFile) {
+		mLocalFileBufferFile = localFileBufferFile;
+	}
+
+	public void setLocalFileBufferRolloverIntervalSeconds(int localFileBufferRolloverIntervalSeconds)
{
+		mLocalFileBufferRolloverIntervalSeconds = localFileBufferRolloverIntervalSeconds;
+	}
+
+	public void setLocalFileBufferArchiveDirectory(String localFileBufferArchiveDirectory) {
+		mLocalFileBufferArchiveDirectory = localFileBufferArchiveDirectory;
+	}
+
+	public void setLocalFileBufferArchiveFileCount(int localFileBufferArchiveFileCount) {
+		mLocalFileBufferArchiveFileCount = localFileBufferArchiveFileCount;
+	}
+
+	@Override
+	public boolean requiresLayout() {
+		return true;
+	}
+
+	@Override
+	public void activateOptions() {
+		LogLog.debug("==> HdfsRollingFileAppender.activateOptions()");
+
+		HdfsLogDestination hdfsDestination = new HdfsLogDestination();
+
+		hdfsDestination.setDirectory(mHdfsDestinationDirectory);
+		hdfsDestination.setFile(mHdfsDestinationFile);
+		hdfsDestination.setEncoding(mEncoding);
+		hdfsDestination.setRolloverIntervalSeconds(mHdfsDestinationRolloverIntervalSeconds);
+
+		LocalFileLogBuffer localFileBuffer = new LocalFileLogBuffer();
+
+		localFileBuffer.setDirectory(mLocalFileBufferDirectory);
+		localFileBuffer.setFile(mLocalFileBufferFile);
+		localFileBuffer.setEncoding(mEncoding);
+		localFileBuffer.setRolloverIntervalSeconds(mLocalFileBufferRolloverIntervalSeconds);
+		localFileBuffer.setArchiveDirectory(mLocalFileBufferArchiveDirectory);
+		localFileBuffer.setArchiveFileCount(mLocalFileBufferArchiveFileCount);
+
+		setBufferAndDestination(localFileBuffer, hdfsDestination);
+
+		start();
+
+		LogLog.debug("<== HdfsRollingFileAppender.activateOptions()");
+	}
+
+	@Override
+	protected void append(LoggingEvent event) {
+		if(isLogable()) {
+			String logMsg = this.layout.format(event);
+	
+			if(layout.ignoresThrowable()) {
+				String[] strThrowable = event.getThrowableStrRep();
+				if (strThrowable != null) {
+					StringBuilder sb = new StringBuilder();
+	
+					sb.append(logMsg);
+	
+					for(String s : strThrowable) {
+						sb.append(s).append(Layout.LINE_SEP);
+					}
+					
+					logMsg = sb.toString();
+				}
+			}
+			
+			addToBuffer(logMsg);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-common/src/main/java/org/apache/log4j/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/log4j/LocalFileLogBuffer.java b/agents-common/src/main/java/org/apache/log4j/LocalFileLogBuffer.java
new file mode 100644
index 0000000..f52f042
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/log4j/LocalFileLogBuffer.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.log4j;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.rmi.dgc.VMID;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TreeSet;
+
+import org.apache.log4j.LocalFileLogBuffer.MiscUtil;
+import org.apache.log4j.helpers.LogLog;
+
+
+public class LocalFileLogBuffer implements LogBuffer<String> {
+	private static final int    DEFAULT_ROLLOVER_INTERVAL = 600;
+
+	private String  mDirectory               = null;
+	private String  mFile                    = null;
+	private String  mEncoding                = null;
+	private boolean mIsAppend                = true;
+	private int     mRolloverIntervalSeconds = DEFAULT_ROLLOVER_INTERVAL;
+	private String  mArchiveDirectory        = null;
+	private int     mArchiveFileCount        = 10;
+
+	private Writer mWriter                  = null;
+	private String mCurrentFilename         = null;
+	private long   mNextRolloverTime        = 0;
+
+	private DestinationDispatcherThread mDispatcherThread = null;
+
+	public String getDirectory() {
+		return mDirectory;
+	}
+
+	public void setDirectory(String directory) {
+		mDirectory = directory;
+	}
+
+	public String getFile() {
+		return mFile;
+	}
+
+	public void setFile(String file) {
+		mFile = file;
+	}
+
+	public String getEncoding() {
+		return mEncoding;
+	}
+
+	public void setEncoding(String encoding) {
+		mEncoding = encoding;
+	}
+
+	public boolean getIsAppend() {
+		return mIsAppend;
+	}
+
+	public void setIsAppend(boolean isAppend) {
+		mIsAppend = isAppend;
+	}
+
+	public int getRolloverIntervalSeconds() {
+		return mRolloverIntervalSeconds;
+	}
+
+	public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+		mRolloverIntervalSeconds = rolloverIntervalSeconds;
+	}
+
+	public String getArchiveDirectory() {
+		return mArchiveDirectory;
+	}
+
+	public void setArchiveDirectory(String archiveDirectory) {
+		mArchiveDirectory = archiveDirectory;
+	}
+
+	public int getArchiveFileCount() {
+		return mArchiveFileCount;
+	}
+
+	public void setArchiveFileCount(int archiveFileCount) {
+		mArchiveFileCount = archiveFileCount;
+	}
+
+
+	@Override
+	public void start(LogDestination<String> destination) {
+		LogLog.debug("==> LocalFileLogBuffer.start()");
+
+		openFile();
+
+		mDispatcherThread = new DestinationDispatcherThread(this, destination);
+
+		mDispatcherThread.start();
+
+		LogLog.debug("<== LocalFileLogBuffer.start()");
+	}
+
+	@Override
+	public void stop() {
+		LogLog.debug("==> LocalFileLogBuffer.stop()");
+		
+		DestinationDispatcherThread dispatcherThread = mDispatcherThread;
+		mDispatcherThread = null;
+
+		if(dispatcherThread != null && dispatcherThread.isAlive()) {
+			dispatcherThread.stopThread();
+
+			try {
+				dispatcherThread.join();
+			} catch (InterruptedException e) {
+				LogLog.warn("LocalFileLogBuffer.stop(): failed in waiting for DispatcherThread", e);
+			}
+		}
+
+		closeFile();
+
+		LogLog.debug("<== LocalFileLogBuffer.stop()");
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return mWriter != null;
+	}
+
+	@Override
+	public boolean add(String log) {
+		boolean ret = false;
+
+		long now = System.currentTimeMillis();
+
+		if(now > mNextRolloverTime) {
+			rollover();
+		}
+
+		Writer writer = mWriter;
+
+		if(writer != null) {
+			try {
+				writer.write(log);
+
+				ret = true;
+			} catch(IOException excp) {
+				LogLog.warn("LocalFileLogBuffer.add(): write failed", excp);
+			}
+		} else {
+			LogLog.warn("LocalFileLogBuffer.add(): writer is null");
+		}
+
+		return ret;
+	}
+
+	private void openFile() {
+		LogLog.debug("==> LocalFileLogBuffer.openFile()");
+
+		closeFile();
+
+		mCurrentFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
+
+		FileOutputStream ostream = null;
+		try {
+			ostream = new FileOutputStream(mCurrentFilename, mIsAppend);
+		} catch(Exception excp) {
+			MiscUtil.createParents(new File(mCurrentFilename));
+
+			try {
+				ostream = new FileOutputStream(mCurrentFilename, mIsAppend);
+			} catch(Exception ex) {
+				// ignore; error printed down
+			}
+		}
+
+		mWriter = createWriter(ostream);
+
+		if(mWriter != null) {
+			LogLog.debug("LocalFileLogBuffer.openFile(): opened file " + mCurrentFilename);
+
+			updateNextRolloverTime();
+		} else {
+			LogLog.warn("LocalFileLogBuffer.openFile(): failed to open file for write" + mCurrentFilename);
+
+			mCurrentFilename = null;
+		}
+
+		LogLog.debug("<== LocalFileLogBuffer.openFile()");
+	}
+
+	private void closeFile() {
+		LogLog.debug("==> LocalFileLogBuffer.closeFile()");
+
+		Writer writer = mWriter;
+
+		mWriter = null;
+
+		if(writer != null) {
+			try {
+				writer.close();
+			} catch(IOException excp) {
+				LogLog.warn("LocalFileLogBuffer: failed to close file " + mCurrentFilename, excp);
+			}
+
+			if(mDispatcherThread != null) {
+				mDispatcherThread.addLogfile(mCurrentFilename);
+			}
+		}
+
+		LogLog.debug("<== LocalFileLogBuffer.closeFile()");
+	}
+
+	private void rollover() {
+		LogLog.debug("==> LocalFileLogBuffer.rollover()");
+
+		closeFile();
+
+		openFile();
+
+		LogLog.debug("<== LocalFileLogBuffer.rollover()");
+	}
+
+	public OutputStreamWriter createWriter(OutputStream os ) {
+	    OutputStreamWriter writer = null;
+
+	    if(os != null) {
+			if(mEncoding != null) {
+				try {
+					writer = new OutputStreamWriter(os, mEncoding);
+				} catch(UnsupportedEncodingException excp) {
+					LogLog.warn("LocalFileLogBuffer: failed to create output writer.", excp);
+				}
+			}
+	
+			if(writer == null) {
+				writer = new OutputStreamWriter(os);
+			}
+	    }
+
+	    return writer;
+	}
+
+	private void updateNextRolloverTime() {
+		mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds
* 1000));
+	}
+
+	boolean isCurrentFilename(String filename) {
+		return mCurrentFilename != null && filename != null && filename.equals(mCurrentFilename);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+
+		sb.append("LocalFileLogBuffer {");
+		sb.append("Directory=").append(mDirectory).append("; ");
+		sb.append("File=").append(mFile).append("; ");
+		sb.append("RolloverIntervaSeconds=").append(mRolloverIntervalSeconds).append("; ");
+		sb.append("ArchiveDirectory=").append(mArchiveDirectory).append("; ");
+		sb.append("ArchiveFileCount=").append(mArchiveFileCount);
+		sb.append("}");
+
+		return sb.toString();
+	}
+	
+	static class MiscUtil {
+		static String TOKEN_HOSTNAME             = "%hostname%";
+		static String TOKEN_APP_INSTANCE         = "%app-instance%";
+		static String TOKEN_FILE_OPEN_TIME_START = "%file-open-time:";
+		static String TOKEN_FILE_OPEN_TIME_END   = "%";
+		
+		static VMID sJvmID = new VMID();
+	
+		public static String replaceTokens(String str) {
+			if(str == null) {
+				return str;
+			}
+	
+			str = replaceHostname(str);
+			str = replaceAppInstance(str);
+			str = replaceFileOpenTime(str);
+	
+			return str;
+		}
+	
+		public static String replaceHostname(String str) {
+			if(!str.contains(TOKEN_HOSTNAME)) {
+				return str;
+			}
+	
+			String hostName = null;
+	
+			try {
+				hostName = InetAddress.getLocalHost().getHostName();
+			} catch (UnknownHostException excp) {
+				LogLog.warn("LocalFileLogBuffer", excp);
+			}
+	
+			if(hostName == null) {
+				hostName = "Unknown";
+			}
+	
+			return str.replaceAll(TOKEN_HOSTNAME, hostName);
+		}
+		
+		public static String replaceAppInstance(String str) {
+			if(!str.contains(TOKEN_APP_INSTANCE)) {
+				return str;
+			}
+	
+			String appInstance = Integer.toString(Math.abs(sJvmID.hashCode()));
+	
+			return str.replaceAll(TOKEN_APP_INSTANCE, appInstance);
+		}
+	
+		public static String replaceFileOpenTime(String str) {
+			Date now = new Date();
+	
+	        while(str.contains(TOKEN_FILE_OPEN_TIME_START)) {
+	            int tagStartPos = str.indexOf(TOKEN_FILE_OPEN_TIME_START);
+	            int tagEndPos   = str.indexOf(TOKEN_FILE_OPEN_TIME_END, tagStartPos + TOKEN_FILE_OPEN_TIME_START.length());
+	
+	            if(tagEndPos <= tagStartPos) {
+	            	break;
+	            }
+	
+	            String tag      = str.substring(tagStartPos, tagEndPos+1);
+	            String dtFormat = tag.substring(TOKEN_FILE_OPEN_TIME_START.length(), tag.lastIndexOf(TOKEN_FILE_OPEN_TIME_END));
+	
+	            String replaceStr = "";
+	
+	            if(dtFormat != null) {
+	                SimpleDateFormat sdf = new SimpleDateFormat(dtFormat);
+	
+	                replaceStr = sdf.format(now);
+	            }
+	
+	            str = str.replaceAll(tag, replaceStr);
+	        }
+	
+	        return str;
+		}
+	
+		public static void createParents(File file) {
+			if(file != null) {
+				String parentName = file.getParent();
+	
+				if (parentName != null) {
+					File parentDir = new File(parentName);
+	
+					if(!parentDir.exists()) {
+						parentDir.mkdirs();
+					}
+				}
+			}
+		}
+
+		public static long getNextRolloverTime(long lastRolloverTime, long interval) {
+			long now = System.currentTimeMillis() / 1000 * 1000; // round to second
+
+			if(lastRolloverTime <= 0) {
+				// should this be set to the next multiple-of-the-interval from start of the day?
+				return now + interval;
+			} else if(lastRolloverTime <= now) {
+				long nextRolloverTime = now + interval;
+
+				// keep it at 'interval' boundary
+				long trimInterval = (nextRolloverTime - lastRolloverTime) % interval;
+
+				return nextRolloverTime - trimInterval;
+			} else {
+				return lastRolloverTime;
+			}
+		}
+	}
+}
+
+class DestinationDispatcherThread extends Thread {
+	private TreeSet<String>        mCompletedLogfiles = new TreeSet<String>();
+	private boolean                mStopThread        = false;
+	private LocalFileLogBuffer     mFileLogBuffer     = null;
+	private LogDestination<String> mDestination       = null;
+
+	private String         mCurrentLogfile = null;
+	private BufferedReader mReader         = null;
+
+	public DestinationDispatcherThread(LocalFileLogBuffer fileLogBuffer, LogDestination<String>
destination) {
+		super(DestinationDispatcherThread.class.getSimpleName());
+
+		mFileLogBuffer = fileLogBuffer;
+		mDestination   = destination;
+
+		setDaemon(true);
+	}
+
+	public void addLogfile(String filename) {
+		LogLog.debug("==> DestinationDispatcherThread.addLogfile(" + filename + ")");
+
+		synchronized(mCompletedLogfiles) {
+			mCompletedLogfiles.add(filename);
+			mCompletedLogfiles.notify();
+		}
+
+		LogLog.debug("<== DestinationDispatcherThread.addLogfile(" + filename + ")");
+	}
+
+	public String getNextLogfile() {
+		synchronized(mCompletedLogfiles) {
+			return mCompletedLogfiles.pollFirst();
+		}
+	}
+
+	public void stopThread() {
+		mStopThread = true;
+	}
+
+	@Override
+	public void run() {
+		init();
+		
+		// destination start() should be from the dispatcher thread
+		mDestination.start();
+
+		int pollIntervalInMs = 1000;
+
+		while(! mStopThread) {
+			String logMsg = getNextLog();
+
+			if(logMsg == null) { // move to the next file
+				synchronized(mCompletedLogfiles) {
+					while(mCompletedLogfiles.isEmpty() && !mStopThread) {
+						try {
+							mCompletedLogfiles.wait(pollIntervalInMs);
+						} catch(InterruptedException excp) {
+							LogLog.warn("LocalFileLogBuffer.run(): failed to wait for log file", excp);
+						}
+					}
+					
+					if(!mCompletedLogfiles.isEmpty()) {
+						openNextFile();
+					}
+				}
+			} else { // deliver to the msg to destination
+				while(! mDestination.add(logMsg) && !mStopThread) {
+					try {
+						Thread.sleep(pollIntervalInMs);
+					} catch(InterruptedException excp) {
+						LogLog.warn("LocalFileLogBuffer.run(): failed to wait for destination to be available",
excp);
+					}
+				}
+			} 
+		}
+
+		mDestination.stop();
+	}
+
+	private void init() {
+		LogLog.debug("==> DestinationDispatcherThread.init()");
+
+		String dirName   = MiscUtil.replaceTokens(mFileLogBuffer.getDirectory());
+		File   directory = new File(dirName);
+
+		if(directory.exists() && directory.isDirectory()) {
+			File[] files = directory.listFiles();
+
+			if(files != null) {
+				for(File file : files) {
+					if(file.exists() && file.canRead()) {
+						String filename = file.getAbsolutePath();
+						if(! mFileLogBuffer.isCurrentFilename(filename)) {
+							addLogfile(filename);
+						}
+					}
+				}
+			}
+		}
+
+		openNextFile();
+
+		LogLog.debug("<== DestinationDispatcherThread.init()");
+	}
+	
+	private String getNextLog() {
+		String log = null;
+
+		if(mReader != null) {
+			try {
+				log = mReader.readLine();
+			} catch (IOException excp) {
+				LogLog.warn("LocalFileLogBuffer.getNextLog(): failed to read from file " + mCurrentLogfile,
excp);
+			}
+
+			if(log == null) {
+				closeCurrentFile();
+			}
+		}
+
+		return log;
+	}
+
+	private void openNextFile() {
+		LogLog.debug("==> openNextFile()");
+
+		closeCurrentFile();
+
+		while(mReader == null) {
+			mCurrentLogfile = getNextLogfile();
+			
+			if(mCurrentLogfile != null) {
+				try {
+					FileReader fr = new FileReader(mCurrentLogfile);
+	
+					mReader = new BufferedReader(fr);
+				} catch(FileNotFoundException excp) {
+					LogLog.warn("openNextFile(): error while opening file " + mCurrentLogfile, excp);
+				}
+			}
+		}
+
+		LogLog.debug("<== openNextFile(" + mCurrentLogfile + ")");
+	}
+	
+	private void closeCurrentFile() {
+		LogLog.debug("==> closeCurrentFile(" + mCurrentLogfile + ")");
+
+		if(mReader != null) {
+			try {
+				mReader.close();
+			} catch(IOException excp) {
+				// ignore
+			}
+		}
+		mReader = null;
+		
+		archiveCurrentFile();
+
+		LogLog.debug("<== closeCurrentFile(" + mCurrentLogfile + ")");
+	}
+
+	private void archiveCurrentFile() {
+		if(mCurrentLogfile != null) {
+			File   logFile         = new File(mCurrentLogfile);
+			String archiveFilename = MiscUtil.replaceTokens(mFileLogBuffer.getArchiveDirectory() +
File.separator + logFile.getName());
+
+			try {
+				if(logFile.exists()) {
+					File archiveFile = new File(archiveFilename);
+
+					MiscUtil.createParents(archiveFile);
+
+					if(! logFile.renameTo(archiveFile)) {
+						// TODO: renameTo() does not work in all cases. in case of failure, copy the file contents
to the destination and delete the file
+					}
+
+					// TODO: ensure no more than mFileLogBuffer.getArchiveFileCount() archive files are
kept
+				}
+			} catch(Exception excp) {
+				LogLog.warn("archiveCurrentFile(): faile to move " + mCurrentLogfile + " to archive location
" + archiveFilename, excp);
+			}
+		}
+		mCurrentLogfile = null;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-common/src/main/java/org/apache/log4j/LogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/log4j/LogBuffer.java b/agents-common/src/main/java/org/apache/log4j/LogBuffer.java
new file mode 100644
index 0000000..799aacb
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/log4j/LogBuffer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.log4j;
+
+
+public interface LogBuffer<T> {
+	public void start(LogDestination<T> destination);
+
+	public void stop();
+
+	boolean isAvailable();
+
+	public boolean add(T log);
+}

http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/be47e640/agents-common/src/main/java/org/apache/log4j/LogDestination.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/log4j/LogDestination.java b/agents-common/src/main/java/org/apache/log4j/LogDestination.java
new file mode 100644
index 0000000..fd4e8f6
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/log4j/LogDestination.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.log4j;
+
+
+public interface LogDestination<T> {
+	public void start();
+
+	public void stop();
+
+	boolean isAvailable();
+
+	public boolean add(T log);
+}


Mime
View raw message