Return-Path: X-Original-To: apmail-argus-commits-archive@minotaur.apache.org Delivered-To: apmail-argus-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F12011144E for ; Fri, 19 Sep 2014 21:33:15 +0000 (UTC) Received: (qmail 36969 invoked by uid 500); 19 Sep 2014 21:33:15 -0000 Delivered-To: apmail-argus-commits-archive@argus.apache.org Received: (qmail 36951 invoked by uid 500); 19 Sep 2014 21:33:15 -0000 Mailing-List: contact commits-help@argus.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@argus.incubator.apache.org Delivered-To: mailing list commits@argus.incubator.apache.org Received: (qmail 36942 invoked by uid 99); 19 Sep 2014 21:33:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Sep 2014 21:33:15 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 19 Sep 2014 21:33:12 +0000 Received: (qmail 36590 invoked by uid 99); 19 Sep 2014 21:32:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Sep 2014 21:32:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 55F00A2069C; Fri, 19 Sep 2014 21:32:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmani@apache.org To: commits@argus.incubator.apache.org Date: Fri, 19 Sep 2014 21:32:54 -0000 Message-Id: <01f0d4ad8ebd466db9b90a6967250a48@git.apache.org> In-Reply-To: <39f319fa52924a8687e1c9d38ad57f71@git.apache.org> References: <39f319fa52924a8687e1c9d38ad57f71@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/9] git commit: ARGUS-5 X-Virus-Checked: Checked by ClamAV on apache.org ARGUS-5 Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/84c22443 Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/84c22443 Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/84c22443 Branch: refs/heads/master Commit: 84c22443d2a5aa8207062132f17652fd3d028509 Parents: 3272c00 Author: rmani Authored: Fri Sep 12 11:22:10 2014 -0700 Committer: rmani Committed: Fri Sep 12 11:22:10 2014 -0700 ---------------------------------------------------------------------- .../hadoop/log/HdfsFileAppender.java | 345 +++++++------------ 1 file changed, 134 insertions(+), 211 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/84c22443/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java index 3398d8a..86a9bb1 100644 --- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java +++ b/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java @@ -37,12 +37,7 @@ import org.apache.log4j.helpers.LogLog; **********************************/ public class HdfsFileAppender extends FileAppender { - - /** - * maximal number of retries to connect to the hdfs. - */ - - + // Constants for checking the DatePattern public static final String MINUTES ="Min"; public static final String HOURS ="Hr"; @@ -64,15 +59,9 @@ public class HdfsFileAppender extends FileAppender { */ private String hdfsFileRollingInterval = "1Day"; - private String FileRollingInterval = "1Day"; + private String fileRollingInterval = "1Day"; - /** - * The log file will be renamed to the value of the scheduledFilename variable when the next interval is entered. For example, if the rollover period is one - * hour, the log file will be renamed to the value of "scheduledFilename" at the beginning of the next hour. The precise time when a rollover occurs depends - * on logging activity. - */ - private String scheduledFilename; - + private String scheduledFileCache; /** @@ -106,15 +95,15 @@ public class HdfsFileAppender extends FileAppender { private String hdfsfileName = null; - private String actualhdfsfileName = null; + private String actualHdfsfileName = null; - private String scheduledhdfsFileName = null; + private String scheduledHdfsFileName = null; private String fileCache = null; private HdfsSink hs = null; - private Writer cachewriter = null; + private Writer cacheWriter = null; private FileOutputStream cacheOstream = null; @@ -126,15 +115,14 @@ public class HdfsFileAppender extends FileAppender { private int hdfsFileRollOffset = 0; - private int FileRollOffset = 0; + private int fileRollOffset = 0; private boolean firstTime = true; private boolean firstTimeLocal = true; - private String HdfsLiveUpdate = "true"; - - + private String hdfsLiveUpdate = "true"; + boolean hdfsUpdateAllowed = true; private String hdfsCheckInterval=null; @@ -157,28 +145,16 @@ public class HdfsFileAppender extends FileAppender { } - /** - * Instantiate a HdfsFileAppender and open the file designated by filename. The opened filename will become the output destination for this appender. - */ - public HdfsFileAppender(Layout layout, String filename, String hdfsFileRollingInterval,String FileRollingInterval, String HdfsLiveUpdate, String hdfsCheckInterval) throws IOException{ - super(layout, filename, true); - this.hdfsFileRollingInterval = hdfsFileRollingInterval; - this.FileRollingInterval = FileRollingInterval; - this.HdfsLiveUpdate = HdfsLiveUpdate; - this.hdfsCheckInterval = hdfsCheckInterval; - - activateOptions(); - } /** * The hdfsFileRollingInterval takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for hdfs File rollover schedule. */ - public void sethdfsFileRollingInterval(String pattern) { + public void setHdfsFileRollingInterval(String pattern) { hdfsFileRollingInterval = pattern; } /** Returns the value of the hdfsFileRollingInterval option. */ - public String gethdfsFileRollingInterval() { + public String getHdfsFileRollingInterval() { return hdfsFileRollingInterval; } @@ -186,31 +162,31 @@ public class HdfsFileAppender extends FileAppender { * The LocalDatePattern takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for local cache File rollover schedule. */ public void setFileRollingInterval(String pattern) { - FileRollingInterval = pattern; + fileRollingInterval = pattern; } /** Returns the value of the FileRollingInterval option. */ public String getFileRollingInterval() { - return FileRollingInterval; + return fileRollingInterval; } /** * This will set liveHdfsUpdate flag , where true will update hdfs live else false will create local cache files and copy the files to hdfs */ public void setHdfsLiveUpdate(String val) { - HdfsLiveUpdate=val; + hdfsLiveUpdate=val; } /** Returns the value of the FileRollingInterval option. */ public String getHdfsLiveUpdate() { - return HdfsLiveUpdate; + return hdfsLiveUpdate; } - public String gethdfsCheckInterval() { + public String getHdfsCheckInterval() { return hdfsCheckInterval; } - public void sethdfsCheckInterval(String val){ + public void setHdfsCheckInterval(String val){ hdfsCheckInterval = ( hdfsCheckInterval != null) ? val : DEFAULT_HDFSCHECKINTERVAL; } @@ -236,12 +212,9 @@ public class HdfsFileAppender extends FileAppender { rc.setType(type); LogLog.debug("File name: " + fileName); File file = new File(fileName); - //scheduledFilename = fileName +"." +file.lastModified(); - //scheduledhdfsFileName = hdfsfileName + "." + file.lastModified(); - scheduledFilename = fileName+sdf.format(new Date(file.lastModified())); - scheduledhdfsFileName = hdfsfileName+sdf.format(new Date(file.lastModified())); + scheduledHdfsFileName = hdfsfileName+sdf.format(new Date(file.lastModified())); firstTime = true; - LogLog.debug("Local and hdfs Files" + scheduledFilename + " " +scheduledhdfsFileName) ; + LogLog.debug("Local and hdfs Files" + scheduledHdfsFileName + " " +scheduledHdfsFileName) ; } else { LogLog.error("Either File or hdfsFileRollingInterval options are not set for appender [" + name + "]."); @@ -249,49 +222,49 @@ public class HdfsFileAppender extends FileAppender { // Local Cache File - if (FileRollingInterval != null && fileCache != null){ - nowLocal.setTime(System.currentTimeMillis()); - int localtype = computeCheckPeriod(FileRollingInterval); - FileRollOffset = getTimeOffset(FileRollingInterval); - printLocalPeriodicity(localtype,FileRollOffset); - rcLocal.setType(localtype); - LogLog.debug("LocalCacheFile name: " + fileCache); - File fileCachehandle = new File(fileCache); - //scheduledFileCache = fileCache+"."+fileCachehandle.lastModified(); - scheduledFileCache = fileCache+sdf.format(new Date(fileCachehandle.lastModified())); - firstTimeLocal = true; - + if (fileRollingInterval != null && fileCache != null){ + nowLocal.setTime(System.currentTimeMillis()); + int localtype = computeCheckPeriod(fileRollingInterval); + fileRollOffset = getTimeOffset(fileRollingInterval); + printLocalPeriodicity(localtype,fileRollOffset); + rcLocal.setType(localtype); + LogLog.debug("LocalCacheFile name: " + fileCache); + File fileCachehandle = new File(fileCache); + scheduledFileCache = fileCache+sdf.format(new Date(fileCachehandle.lastModified())); + firstTimeLocal = true; + } else { LogLog.error("Either File or LocalDatePattern options are not set for appender [" + name + "]."); } - if (HdfsLiveUpdate.equalsIgnoreCase("true")) { - hdfsUpdateAllowed = true; - } else if (HdfsLiveUpdate.equalsIgnoreCase("false")) { - hdfsUpdateAllowed = false; - } - - actualhdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis()); - + hdfsUpdateAllowed = Boolean.parseBoolean(hdfsLiveUpdate); + actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis()); } - + public static int containsIgnoreCase(String str1, String str2) { + return str1.toLowerCase().indexOf(str2.toLowerCase()); + } + public int computeCheckPeriod(String timePattern){ - - if ( timePattern.indexOf(MINUTES) > 0 || timePattern.indexOf(MINUTES.toLowerCase()) > 0|| timePattern.indexOf(MINUTES.toUpperCase()) > 0 ) { + + if(containsIgnoreCase(timePattern, MINUTES) > 0) { return TOP_OF_MINUTE; } - if ( timePattern.indexOf(HOURS) > 0 || timePattern.indexOf(HOURS.toLowerCase()) > 0|| timePattern.indexOf(HOURS.toUpperCase()) > 0 ) { + + if(containsIgnoreCase(timePattern, HOURS) > 0) { return TOP_OF_HOUR; } - if ( timePattern.indexOf(DAYS) > 0 || timePattern.indexOf(DAYS.toLowerCase()) > 0|| timePattern.indexOf(DAYS.toUpperCase()) > 0 ) { + + if(containsIgnoreCase(timePattern, DAYS) > 0) { return TOP_OF_DAY; } - if ( timePattern.indexOf(WEEKS) > 0 || timePattern.indexOf(WEEKS.toLowerCase()) > 0|| timePattern.indexOf(WEEKS.toUpperCase()) > 0 ) { + + if(containsIgnoreCase(timePattern, WEEKS) > 0) { return TOP_OF_WEEK; } - if ( timePattern.indexOf(MONTHS) > 0 || timePattern.indexOf(MONTHS.toLowerCase()) > 0|| timePattern.indexOf(MONTHS.toUpperCase()) > 0 ) { + + if(containsIgnoreCase(timePattern, MONTHS) > 0) { return TOP_OF_MONTH; } @@ -329,26 +302,26 @@ public class HdfsFileAppender extends FileAppender { int index; int offset=-1; - if ( (index = timePattern.indexOf(MINUTES)) > 0 || (index = timePattern.indexOf(MINUTES.toLowerCase())) > 0|| (index = timePattern.indexOf(MINUTES.toUpperCase())) > 0 ) { - + if ((index = containsIgnoreCase(timePattern, MINUTES)) > 0) { offset = Integer.parseInt(timePattern.substring(0,index)); - } - if ( (index = timePattern.indexOf(HOURS)) > 0 || (index =timePattern.indexOf(HOURS.toLowerCase())) > 0|| (index=timePattern.indexOf(HOURS.toUpperCase())) > 0 ) { + + if ((index = containsIgnoreCase(timePattern, HOURS)) > 0) { offset = Integer.parseInt(timePattern.substring(0,index)); } - if ( (index = timePattern.indexOf(DAYS)) > 0 || (index =timePattern.indexOf(DAYS.toLowerCase())) > 0|| (index=timePattern.indexOf(DAYS.toUpperCase())) > 0 ) { + + if ((index = containsIgnoreCase(timePattern, DAYS)) > 0) { offset = Integer.parseInt(timePattern.substring(0,index)); } - if ( (index = timePattern.indexOf(WEEKS)) > 0 || (index =timePattern.indexOf(WEEKS.toLowerCase())) > 0|| (index=timePattern.indexOf(WEEKS.toUpperCase())) > 0 ) { + if ((index = containsIgnoreCase(timePattern, WEEKS)) > 0) { offset = Integer.parseInt(timePattern.substring(0,index)); } - if ( (index = timePattern.indexOf(MONTHS)) > 0 || (index =timePattern.indexOf(MONTHS.toLowerCase())) > 0|| (index=timePattern.indexOf(MONTHS.toUpperCase())) > 0 ) { + if ((index = containsIgnoreCase(timePattern, MONTHS)) > 0) { offset = Integer.parseInt(timePattern.substring(0,index)); } @@ -395,15 +368,12 @@ public class HdfsFileAppender extends FileAppender { } long epochNow = System.currentTimeMillis(); - //String datedFilename = fileName + "." + epochNow; - //String datedhdfsFileName = hdfsfileName + "." + epochNow; - - String datedFilename =fileName+sdf.format(epochNow); + String datedhdfsFileName = hdfsfileName+sdf.format(epochNow); LogLog.debug("In rollOver epochNow" + epochNow + " " + "nextCheck: " + prevnextCheck ); - + // It is too early to roll over because we are still within the bounds of the current interval. Rollover will occur once the next interval is reached. if (epochNow < prevnextCheck) { @@ -413,12 +383,12 @@ public class HdfsFileAppender extends FileAppender { // close current file, and rename it to datedFilename this.closeFile(); - LogLog.debug("Rolling Over hdfs file to " + scheduledhdfsFileName); + LogLog.debug("Rolling Over hdfs file to " + scheduledHdfsFileName); if ( hdfsAvailable ) { - rollOverHdfsFile(scheduledhdfsFileName); - actualhdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis()); + // for hdfs file we don't rollover the fike, we rename the file. + actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis()); } try { @@ -427,9 +397,7 @@ public class HdfsFileAppender extends FileAppender { } catch(IOException e) { errorHandler.error("setFile(" + fileName + ", false) call failed."); } - scheduledFilename = datedFilename; - scheduledhdfsFileName = datedhdfsFileName; - + scheduledHdfsFileName = datedhdfsFileName; } @@ -438,14 +406,13 @@ public class HdfsFileAppender extends FileAppender { */ private void rollOverLocal() throws IOException { /* Compute filename, but only if datePattern is specified */ - if(FileRollingInterval == null) { + if(fileRollingInterval == null) { errorHandler.error("Missing LocalDatePattern option in rollOverLocal()."); return; } long epochNow = System.currentTimeMillis(); - //String datedCacheFileName = fileCache + "." + epochNow; String datedCacheFileName = fileCache+sdf.format(epochNow); LogLog.debug("In rollOverLocal() epochNow" + epochNow + " " + "nextCheckLocal: " + prevnextCheckLocal ); @@ -473,7 +440,7 @@ public class HdfsFileAppender extends FileAppender { } else { LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+scheduledFileCache+"]."); } - setfileCacheWriter(); + setFileCacheWriter(); scheduledFileCache = datedCacheFileName; } } @@ -496,7 +463,7 @@ public class HdfsFileAppender extends FileAppender { // trailing spaces in file names. String val = file.trim(); - fileName = val; + fileName=val; fileCache=val+".cache"; } @@ -549,7 +516,7 @@ public class HdfsFileAppender extends FileAppender { this.bufferSize = bufferSize; //set cache file - setfileCacheWriter(); + setFileCacheWriter(); writeHeader(); @@ -594,15 +561,6 @@ public class HdfsFileAppender extends FileAppender { protected void subAppend(LoggingEvent event) { LogLog.debug("Called subAppend for logging into hdfs..."); - //String logPath = (String)event.getMDC(Log4jAuditProvider.MDC_VAR_LOG_RESOURCE_PATH) ; - - //LogLog.debug("Log Path " + logPath); - - /* - if (logPath.startsWith(fileName)) { - return ; - } - */ long n = System.currentTimeMillis(); if(n >= nextCheck) { now.setTime(n); @@ -625,7 +583,7 @@ public class HdfsFileAppender extends FileAppender { if ( nLocal > nextCheckLocal ) { nowLocal.setTime(nLocal); prevnextCheckLocal = nextCheckLocal; - nextCheckLocal = rcLocal.getNextCheckMillis(nowLocal, FileRollOffset); + nextCheckLocal = rcLocal.getNextCheckMillis(nowLocal, fileRollOffset); if ( firstTimeLocal) { prevnextCheckLocal = nextCheckLocal; firstTimeLocal = false; @@ -653,7 +611,7 @@ public class HdfsFileAppender extends FileAppender { closeWriter(); this.qw = null; //this. - this.closehdfsWriter(); + this.closeHdfsWriter(); this.closeCacheWriter(); } @@ -661,8 +619,8 @@ public class HdfsFileAppender extends FileAppender { public synchronized void close() { LogLog.debug("Closing all resource.."); this.closeFile(); - this.closehdfsWriter(); - this.closehdfsostream(); + this.closeHdfsWriter(); + this.closeHdfsOstream(); this.closeFileSystem(); } @@ -676,8 +634,8 @@ public class HdfsFileAppender extends FileAppender { } catch(IOException ie) { LogLog.error("unable to close output stream", ie); } - this.closehdfsWriter(); - this.closehdfsostream(); + this.closeHdfsWriter(); + this.closeHdfsOstream(); } @Override @@ -701,9 +659,7 @@ public class HdfsFileAppender extends FileAppender { /******* HDFS Appender methods ***********/ - //private void appendHDFSFileSystem(String fileNameSystem, String filename, String fileCache,boolean append, boolean bufferedIO, int bufferSize, Layout layout, String encoding, LoggingEvent event, String scheduledFileCache, Writer cacheWriter,boolean hdfsUpdateAllowed ) { - - private void appendHDFSFileSystem(LoggingEvent event) { + private void appendHDFSFileSystem(LoggingEvent event) { long currentTime = System.currentTimeMillis(); @@ -711,8 +667,8 @@ public class HdfsFileAppender extends FileAppender { if ( currentTime >= hdfsNextCheck ) { - LogLog.debug("About to Open fileSystem" + fileSystemName+" "+actualhdfsfileName) ; - hs = openHdfsSink(fileSystemName,actualhdfsfileName,fileCache,fileAppend,bufferedIO,bufferSize,layout,encoding,scheduledFileCache,cachewriter,hdfsUpdateAllowed,processUser); + LogLog.debug("About to Open fileSystem" + fileSystemName+" "+actualHdfsfileName) ; + hs = openHdfsSink(fileSystemName,actualHdfsfileName,fileCache,fileAppend,bufferedIO,bufferSize,layout,encoding,scheduledFileCache,cacheWriter,hdfsUpdateAllowed,processUser); if (hdfsUpdateAllowed) { // stream into hdfs only when liveHdfsUpdate flag is true else write to cache file. hs.setOsteam(); @@ -742,9 +698,7 @@ public class HdfsFileAppender extends FileAppender { } writeToCache(event); } - - //hs.closehdfsWriter(); - //hs.closehdfsostream(); + } @@ -765,18 +719,18 @@ public class HdfsFileAppender extends FileAppender { } - private void closehdfsostream() { + private void closeHdfsOstream() { if (hs != null ){ LogLog.debug("Closing hdfs outstream") ; - hs.closehdfsostream(); + hs.closeHdfsOstream(); } } - private void closehdfsWriter() { + private void closeHdfsWriter() { if (hs != null) { LogLog.debug("Closing hdfs Writer") ; - hs.closehdfsWriter(); + hs.closeHdfsWriter(); } } @@ -784,36 +738,27 @@ public class HdfsFileAppender extends FileAppender { hs.closeHdfsSink(); } - private void rollOverHdfsFile(String scheduleHdfsFileName){ - try { - // Rollover hdfs file. - hs.rollOverHdfsFile(scheduleHdfsFileName); - } catch (IOException ie) { - // TODO..what needs to happend if rollover of hdfs file fails. - LogLog.debug("Rollover of hdfs failed..") ; - } - } - + /****** Cache File Methods **/ - public void setfileCacheWriter() { + public void setFileCacheWriter() { try { - setfileCacheOstream(fileCache); + setFileCacheOstream(fileCache); } catch(IOException ie) { LogLog.error("Logging failed while tring to write into Cache File..", ie); } LogLog.debug("Setting Cache Writer.."); - cachewriter = createCacheFileWriter(cacheOstream); + cacheWriter = createCacheFileWriter(cacheOstream); if(bufferedIO) { - cachewriter = new BufferedWriter(cachewriter, bufferSize); + cacheWriter = new BufferedWriter(cacheWriter, bufferSize); } } - private void setfileCacheOstream(String fileCache) throws IOException { + private void setFileCacheOstream(String fileCache) throws IOException { try { cacheOstream = new FileOutputStream(fileCache, true); @@ -854,10 +799,10 @@ public class HdfsFileAppender extends FileAppender { public void writeToCache(LoggingEvent event) { try { - LogLog.debug("Writing log to Cache.." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + cachewriter.toString()); + LogLog.debug("Writing log to Cache.." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + cacheWriter.toString()); - cachewriter.write(this.layout.format(event)); - cachewriter.flush(); + cacheWriter.write(this.layout.format(event)); + cacheWriter.flush(); if(layout.ignoresThrowable()) { String[] s = event.getThrowableStrRep(); @@ -865,9 +810,9 @@ public class HdfsFileAppender extends FileAppender { int len = s.length; for(int i = 0; i < len; i++) { LogLog.debug("Log:" + s[i]); - cachewriter.write(s[i]); - cachewriter.write(Layout.LINE_SEP); - cachewriter.flush(); + cacheWriter.write(s[i]); + cacheWriter.write(Layout.LINE_SEP); + cacheWriter.flush(); } } } @@ -882,7 +827,6 @@ public class HdfsFileAppender extends FileAppender { long epochNow = System.currentTimeMillis(); - //String datedCacheFileName = fileCache + "." + epochNow; String datedCacheFileName = fileCache + "." + epochNow; LogLog.debug("Rolling over remaining cache File to new file"+ datedCacheFileName); closeCacheWriter(); @@ -906,9 +850,9 @@ public class HdfsFileAppender extends FileAppender { public void closeCacheWriter() { try { - if(cachewriter != null) { - cachewriter.close(); - cachewriter = null; + if(cacheWriter != null) { + cacheWriter.close(); + cacheWriter = null; } } catch(IOException ie) { LogLog.error("unable to close cache writer", ie); @@ -1006,6 +950,10 @@ class RollingCalendar extends GregorianCalendar { class HdfsSink { + private static final String DS_REPLICATION_VAL = "1"; + private static final String DS_REPLICATION_KEY = "dfs.replication"; + private static final String FS_DEFAULT_NAME_KEY = "ds.default.name"; + private Configuration conf = null; private FileSystem fs= null; private Path pt = null; @@ -1021,7 +969,6 @@ class HdfsSink { private static int fstime=0; private CacheFileWatcher cfw = null; private boolean hdfsUpdateAllowed=true; - private String scheduledCacheFile=null; private String processUser=null; @@ -1048,14 +995,15 @@ class HdfsSink { this.bufferedIO=bufferedIO; this.fileCache=fileCache; this.hdfsUpdateAllowed=hdfsUpdateAllowed; - this.scheduledCacheFile=scheduledCacheFile; this.processUser=processUser; try { - conf.set("dfs.replication", "1"); + + conf.set(DS_REPLICATION_KEY,DS_REPLICATION_VAL); + conf.set(FS_DEFAULT_NAME_KEY, this.fsName); if ( fs == null) { - LogLog.debug("Opening Connection to hdfs Sytem"); + LogLog.debug("Opening Connection to hdfs Sytem" + this.fsName); UserGroupInformation ugi = UserGroupInformation.createProxyUser(this.processUser, UserGroupInformation.getLoginUser()); ugi.doAs( new PrivilegedExceptionAction() { @@ -1070,7 +1018,7 @@ class HdfsSink { LogLog.debug("About to run CacheFilWatcher..."); Path hdfsfilePath = getParent(); - cfw = new CacheFileWatcher(this.fs,this.fileCache,hdfsfilePath,this.scheduledCacheFile,cacheWriter,this.hdfsUpdateAllowed,conf); + cfw = new CacheFileWatcher(this.fs,this.fileCache,hdfsfilePath,cacheWriter,this.hdfsUpdateAllowed,conf); cfw.start(); } @@ -1103,7 +1051,7 @@ class HdfsSink { public void setOsteam() throws IOException { try { - pt = new Path(fileName); + pt = new Path(this.fileName); // if file Exist append it if(fs.exists(pt)) { LogLog.debug("Appending File: "+ this.fsName+":"+this.fileName); @@ -1113,9 +1061,8 @@ class HdfsSink { hdfsostream=fs.append(pt); } else { - - LogLog.debug("Creating File directories in hdfs if not present..."); - String parentName = new Path(fileName).getParent().toString(); + LogLog.debug("Creating File directories in hdfs if not present.."+ this.fsName+":"+this.fileName); + String parentName = new Path(this.fileName).getParent().toString(); if(parentName != null) { Path parentDir = new Path(parentName); if (!fs.exists(parentDir) ) { @@ -1132,12 +1079,11 @@ class HdfsSink { } public void setWriter() { - LogLog.debug("Setting Writer.."); hdfswriter = createhdfsWriter(hdfsostream); if(bufferedIO) { hdfswriter = new BufferedWriter(hdfswriter, bufferSize); - } + } } public Writer getWriter() { @@ -1181,7 +1127,7 @@ class HdfsSink { } catch (IOException ie) { LogLog.error("Unable to log header message to hdfs:", ie); throw ie; - } + } } public @@ -1203,7 +1149,7 @@ class HdfsSink { } - public void closehdfsostream() { + public void closeHdfsOstream() { try { if(this.hdfsostream != null) { this.hdfsostream.close(); @@ -1215,7 +1161,7 @@ class HdfsSink { } - public void closehdfsWriter() { + public void closeHdfsWriter() { try { if(hdfswriter != null) { hdfswriter.close(); @@ -1254,53 +1200,33 @@ class HdfsSink { return retval; } - public void rollOverHdfsFile(String scheduledHdfsFilename) throws IOException { - - /* try { - - Path target = new Path(scheduledHdfsFilename); - if(this.fs.exists(target)) { - this.fs.delete(target, true); - } - Path file = new Path(this.fileName); - boolean result = this.fs.rename(file, target); - if(result) { - LogLog.debug(fileName + " -> " + scheduledHdfsFilename); - } else { - LogLog.error("Failed to rename [" + fileName + "] to [" + scheduledHdfsFilename + "]."); - } - - }catch(IOException ie) { - - LogLog.error("Unable to rollover hdfs file ",ie); - throw ie; - } */ - } } // CacheFileWatcher Thread class CacheFileWatcher extends Thread { + + long CACHEFILE_WATCHER_SLEEP_TIME = 1000*60*2; + Configuration conf = null; - FileSystem fs = null; - String cacheFile = null; - File parentDir = null; - File[] files = null; - Path fsPath = null; - Path hdfsfilePath = null; - Writer cacheWriter = null; - String scheduledFileCache = null; - boolean hdfsUpdateAllowed=true; - boolean cacheFilesCopied = false; + private FileSystem fs = null; + private String cacheFile = null; + private File parentDir = null; + private File[] files = null; + private Path fsPath = null; + private Path hdfsfilePath = null; + private Writer cacheWriter = null; + + private boolean hdfsUpdateAllowed=true; + private boolean cacheFilesCopied = false; - CacheFileWatcher(FileSystem fs, String cacheFile, Path hdfsfilePath, String scheduledFileCache, Writer cacheWriter, boolean hdfsUpdateAllowed, Configuration conf) { + CacheFileWatcher(FileSystem fs, String cacheFile, Path hdfsfilePath, Writer cacheWriter, boolean hdfsUpdateAllowed, Configuration conf) { this.fs = fs; this.cacheFile = cacheFile; this.conf = conf; this.hdfsfilePath = hdfsfilePath; - this.scheduledFileCache = scheduledFileCache; this.cacheWriter = cacheWriter; this.hdfsUpdateAllowed = hdfsUpdateAllowed; } @@ -1310,7 +1236,6 @@ class CacheFileWatcher extends Thread { LogLog.debug("CacheFileWatcher Started"); - while (!cacheFilesCopied ){ if (hdfsUpdateAllowed) { @@ -1318,10 +1243,9 @@ class CacheFileWatcher extends Thread { } if ( !cacheFilePresent(cacheFile) ) { - - + try { - Thread.sleep(1000*60*2); + Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME); } catch (InterruptedException ie) { LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie); } @@ -1339,7 +1263,7 @@ class CacheFileWatcher extends Thread { LogLog. error("Error while copying Cache Files to hdfs..Sleeping for next try",t); try { - Thread.sleep(1000*60*2); + Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME); } catch (InterruptedException ie) { LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie); } @@ -1398,8 +1322,7 @@ class CacheFileWatcher extends Thread { if (new File(cacheFile).length() != 0 ) { long epochNow = System.currentTimeMillis(); - //String datedCacheFileName = cacheFile + "." + epochNow; - String datedCacheFileName = cacheFile + sdf.format(epochNow); + String datedCacheFileName = cacheFile + sdf.format(epochNow); LogLog.debug("Rolling over remaining cache File "+ datedCacheFileName); closeCacheFile(); @@ -1423,14 +1346,14 @@ class CacheFileWatcher extends Thread { } public void closeCacheFile() { - try { - if(cacheWriter != null) { - cacheWriter.close(); - cacheWriter = null; - } - } catch(IOException ie) { - LogLog.error("unable to close cache writer", ie); - } + try { + if(cacheWriter != null) { + cacheWriter.close(); + cacheWriter = null; + } + } catch(IOException ie) { + LogLog.error("unable to close cache writer", ie); + } } }