ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rm...@apache.org
Subject [3/9] git commit: ARGUS-5
Date Fri, 19 Sep 2014 21:32:54 GMT
ARGUS-5

Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/84c22443
Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/84c22443
Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/84c22443

Branch: refs/heads/master
Commit: 84c22443d2a5aa8207062132f17652fd3d028509
Parents: 3272c00
Author: rmani <rmani@hortonworks.com>
Authored: Fri Sep 12 11:22:10 2014 -0700
Committer: rmani <rmani@hortonworks.com>
Committed: Fri Sep 12 11:22:10 2014 -0700

----------------------------------------------------------------------
 .../hadoop/log/HdfsFileAppender.java            | 345 +++++++------------
 1 file changed, 134 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/84c22443/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
b/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
index 3398d8a..86a9bb1 100644
--- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
+++ b/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
@@ -37,12 +37,7 @@ import org.apache.log4j.helpers.LogLog;
 **********************************/
 public class HdfsFileAppender extends FileAppender {
    
-	
-	/**
-     * maximal number of retries to connect to the hdfs.
-     */
-	
-    
+	   
     // Constants for checking the DatePattern 
     public static final String MINUTES ="Min";
 	public static final String HOURS ="Hr";
@@ -64,15 +59,9 @@ public class HdfsFileAppender extends FileAppender {
      */
     private String hdfsFileRollingInterval = "1Day";
     
-    private String FileRollingInterval = "1Day";
+    private String fileRollingInterval = "1Day";
 
-    /**
-     * The log file will be renamed to the value of the scheduledFilename variable when the
next interval is entered. For example, if the rollover period is one
-     * hour, the log file will be renamed to the value of "scheduledFilename" at the beginning
of the next hour. The precise time when a rollover occurs depends
-     * on logging activity.
-     */
-    private String scheduledFilename;
-    
+     
     private String scheduledFileCache;
 
     /**
@@ -106,15 +95,15 @@ public class HdfsFileAppender extends FileAppender {
     
     private String hdfsfileName = null;
     
-    private String actualhdfsfileName = null;
+    private String actualHdfsfileName = null;
     
-    private String scheduledhdfsFileName = null;
+    private String scheduledHdfsFileName = null;
     
     private String fileCache = null;
     
     private HdfsSink hs = null;  
     
-    private Writer cachewriter = null;
+    private Writer cacheWriter = null;
     
 	private FileOutputStream cacheOstream = null;
 	
@@ -126,15 +115,14 @@ public class HdfsFileAppender extends FileAppender {
 
     private int hdfsFileRollOffset = 0;    
     
-    private int FileRollOffset = 0;
+    private int fileRollOffset = 0;
     
     private boolean firstTime = true;
     
     private boolean firstTimeLocal = true;
     
-    private String HdfsLiveUpdate = "true";
-    
-    
+    private String hdfsLiveUpdate = "true";
+        
     boolean hdfsUpdateAllowed = true;
     
     private String hdfsCheckInterval=null;
@@ -157,28 +145,16 @@ public class HdfsFileAppender extends FileAppender {
     	
     }
 
-    /**
-     * Instantiate a <code>HdfsFileAppender</code> and open the file designated
by <code>filename</code>. The opened filename will become the output destination
for this appender.
-     */
-    public HdfsFileAppender(Layout layout, String filename, String hdfsFileRollingInterval,String
FileRollingInterval, String HdfsLiveUpdate, String hdfsCheckInterval) throws IOException{
-        super(layout, filename, true);
-        this.hdfsFileRollingInterval = hdfsFileRollingInterval;
-        this.FileRollingInterval = FileRollingInterval;
-        this.HdfsLiveUpdate = HdfsLiveUpdate;
-        this.hdfsCheckInterval = hdfsCheckInterval;
-  
-        activateOptions();
-     }
 
     /**
      * The <b>hdfsFileRollingInterval</b> takes a string like 1min, 5min,...
1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for hdfs File rollover schedule.
      */
-    public void sethdfsFileRollingInterval(String pattern) {
+    public void setHdfsFileRollingInterval(String pattern) {
         hdfsFileRollingInterval = pattern;
     }
 
     /** Returns the value of the <b>hdfsFileRollingInterval</b> option. */
-    public String gethdfsFileRollingInterval() {
+    public String getHdfsFileRollingInterval() {
         return hdfsFileRollingInterval;
     }
     
@@ -186,31 +162,31 @@ public class HdfsFileAppender extends FileAppender {
      * The <b>LocalDatePattern</b> takes a string like 1min, 5min,... 1hr, 2hrs,..
1day, 2days... 1week, 2weeks.. 1month, 2months.. for local cache File rollover schedule.
      */
     public void setFileRollingInterval(String pattern) {
-    	FileRollingInterval = pattern;
+    	fileRollingInterval = pattern;
     }
 
     /** Returns the value of the <b>FileRollingInterval</b> option. */
     public String getFileRollingInterval() {
-        return FileRollingInterval;
+        return fileRollingInterval;
     }
 
     /**
      * This will set liveHdfsUpdate flag , where true  will update hdfs live else false will
create local cache files and copy the files to hdfs
      */
     public void setHdfsLiveUpdate(String val) {
-    	HdfsLiveUpdate=val;
+    	hdfsLiveUpdate=val;
      }
 
     /** Returns the value of the <b>FileRollingInterval</b> option. */
     public String getHdfsLiveUpdate() {
-        return HdfsLiveUpdate;
+        return hdfsLiveUpdate;
     }
     
-    public String gethdfsCheckInterval() {
+    public String getHdfsCheckInterval() {
     	return hdfsCheckInterval;
     }
     
-    public void sethdfsCheckInterval(String val){
+    public void setHdfsCheckInterval(String val){
     	hdfsCheckInterval = ( hdfsCheckInterval != null) ? val : DEFAULT_HDFSCHECKINTERVAL;
     }
     
@@ -236,12 +212,9 @@ public class HdfsFileAppender extends FileAppender {
             rc.setType(type);
             LogLog.debug("File name: " + fileName);
             File file = new File(fileName);
-            //scheduledFilename = fileName +"." +file.lastModified();
-       	    //scheduledhdfsFileName = hdfsfileName + "." + file.lastModified();
-            scheduledFilename = fileName+sdf.format(new Date(file.lastModified()));
-       	    scheduledhdfsFileName = hdfsfileName+sdf.format(new Date(file.lastModified()));
+            scheduledHdfsFileName = hdfsfileName+sdf.format(new Date(file.lastModified()));
        	    firstTime = true;
-            LogLog.debug("Local and hdfs Files" + scheduledFilename + " " +scheduledhdfsFileName)
;
+            LogLog.debug("Local and hdfs Files" + scheduledHdfsFileName + " " +scheduledHdfsFileName)
;
            
         } else {
         	LogLog.error("Either File or hdfsFileRollingInterval options are not set for appender
[" + name + "].");
@@ -249,49 +222,49 @@ public class HdfsFileAppender extends FileAppender {
         
         // Local Cache File 
 
-        if (FileRollingInterval != null && fileCache != null){
-        	   nowLocal.setTime(System.currentTimeMillis());
-               int localtype = computeCheckPeriod(FileRollingInterval);
-               FileRollOffset = getTimeOffset(FileRollingInterval);
-               printLocalPeriodicity(localtype,FileRollOffset);
-               rcLocal.setType(localtype);
-               LogLog.debug("LocalCacheFile name: " + fileCache);
-               File fileCachehandle = new File(fileCache);
-          	   //scheduledFileCache = fileCache+"."+fileCachehandle.lastModified();
-               scheduledFileCache = fileCache+sdf.format(new Date(fileCachehandle.lastModified()));
-          	   firstTimeLocal = true;
-          	   
+        if (fileRollingInterval != null && fileCache != null){
+    	   nowLocal.setTime(System.currentTimeMillis());
+           int localtype = computeCheckPeriod(fileRollingInterval);
+           fileRollOffset = getTimeOffset(fileRollingInterval);
+           printLocalPeriodicity(localtype,fileRollOffset);
+           rcLocal.setType(localtype);
+           LogLog.debug("LocalCacheFile name: " + fileCache);
+           File fileCachehandle = new File(fileCache);
+           scheduledFileCache = fileCache+sdf.format(new Date(fileCachehandle.lastModified()));
+      	   firstTimeLocal = true;
+      	   
         } else {
         	LogLog.error("Either File or LocalDatePattern options are not set for appender ["
+ name + "].");
         }
         
-        if (HdfsLiveUpdate.equalsIgnoreCase("true")) {
-        	hdfsUpdateAllowed  = true;
-        } else if (HdfsLiveUpdate.equalsIgnoreCase("false")) {
-        	hdfsUpdateAllowed = false;
-        } 
-        
-        actualhdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
-        
+        hdfsUpdateAllowed = Boolean.parseBoolean(hdfsLiveUpdate);
+        actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
     }
     
-    
+    public static int containsIgnoreCase(String str1, String str2) {
+    	return str1.toLowerCase().indexOf(str2.toLowerCase());
+    }
+        
     
     public int computeCheckPeriod(String timePattern){
-    	
-    	if ( timePattern.indexOf(MINUTES) > 0 || timePattern.indexOf(MINUTES.toLowerCase())
> 0|| timePattern.indexOf(MINUTES.toUpperCase()) > 0 ) {
+
+    	if(containsIgnoreCase(timePattern, MINUTES) > 0) {
     		return TOP_OF_MINUTE;
     	}
-    	if ( timePattern.indexOf(HOURS) > 0 || timePattern.indexOf(HOURS.toLowerCase()) >
0|| timePattern.indexOf(HOURS.toUpperCase()) > 0 ) {
+
+    	if(containsIgnoreCase(timePattern, HOURS) > 0) {
     		return TOP_OF_HOUR;
     	}
-    	if ( timePattern.indexOf(DAYS) > 0 || timePattern.indexOf(DAYS.toLowerCase()) >
0|| timePattern.indexOf(DAYS.toUpperCase()) > 0 ) {
+    	
+    	if(containsIgnoreCase(timePattern, DAYS) > 0) {
     		return TOP_OF_DAY;
     	}
-    	if ( timePattern.indexOf(WEEKS) > 0 || timePattern.indexOf(WEEKS.toLowerCase()) >
0|| timePattern.indexOf(WEEKS.toUpperCase()) > 0 ) {
+    	
+    	if(containsIgnoreCase(timePattern, WEEKS) > 0) {
     		return TOP_OF_WEEK;
     	}
-    	if ( timePattern.indexOf(MONTHS) > 0 || timePattern.indexOf(MONTHS.toLowerCase())
> 0|| timePattern.indexOf(MONTHS.toUpperCase()) > 0 ) {
+    	
+    	if(containsIgnoreCase(timePattern, MONTHS) > 0) {
     		return TOP_OF_MONTH;
     	}
     	
@@ -329,26 +302,26 @@ public class HdfsFileAppender extends FileAppender {
 	    int index;	
 	    int offset=-1;
 	    
-		if ( (index = timePattern.indexOf(MINUTES)) > 0 || (index = timePattern.indexOf(MINUTES.toLowerCase()))
> 0|| (index = timePattern.indexOf(MINUTES.toUpperCase())) > 0 ) {
-			
+		if ((index = containsIgnoreCase(timePattern, MINUTES)) > 0) {
 			offset = Integer.parseInt(timePattern.substring(0,index));
-			
 		}
-		if ( (index = timePattern.indexOf(HOURS)) > 0 || (index =timePattern.indexOf(HOURS.toLowerCase()))
> 0|| (index=timePattern.indexOf(HOURS.toUpperCase())) > 0 ) {
+		
+		if ((index = containsIgnoreCase(timePattern, HOURS)) > 0) {
 			offset = Integer.parseInt(timePattern.substring(0,index));
 			
 		}
-		if ( (index = timePattern.indexOf(DAYS)) > 0 || (index =timePattern.indexOf(DAYS.toLowerCase()))
> 0|| (index=timePattern.indexOf(DAYS.toUpperCase())) > 0 ) {
+		
+		if ((index = containsIgnoreCase(timePattern, DAYS)) > 0) {
 			offset = Integer.parseInt(timePattern.substring(0,index));
 			
 		}
 		
-		if ( (index = timePattern.indexOf(WEEKS)) > 0 || (index =timePattern.indexOf(WEEKS.toLowerCase()))
> 0|| (index=timePattern.indexOf(WEEKS.toUpperCase())) > 0 ) {
+		if ((index = containsIgnoreCase(timePattern, WEEKS)) > 0) {
 			offset = Integer.parseInt(timePattern.substring(0,index));
 			
 		}
 		
-		if ( (index = timePattern.indexOf(MONTHS)) > 0 || (index =timePattern.indexOf(MONTHS.toLowerCase()))
> 0|| (index=timePattern.indexOf(MONTHS.toUpperCase())) > 0 ) {
+		if ((index = containsIgnoreCase(timePattern, MONTHS)) > 0) {
 			offset = Integer.parseInt(timePattern.substring(0,index));
 			
 		}
@@ -395,15 +368,12 @@ public class HdfsFileAppender extends FileAppender {
         }
         
         long epochNow = System.currentTimeMillis();
-        //String datedFilename = fileName + "." + epochNow;
-        //String datedhdfsFileName = hdfsfileName + "." + epochNow;
-        
-        String datedFilename =fileName+sdf.format(epochNow);
+         
         String datedhdfsFileName = hdfsfileName+sdf.format(epochNow);
         
         LogLog.debug("In rollOver epochNow" + epochNow + "  " + "nextCheck: " + prevnextCheck
);
         
-        
+      
         // It is too early to roll over because we are still within the bounds of the current
interval. Rollover will occur once the next interval is reached.
            
         if (epochNow < prevnextCheck) {
@@ -413,12 +383,12 @@ public class HdfsFileAppender extends FileAppender {
         // close current file, and rename it to datedFilename
         this.closeFile();
         
-        LogLog.debug("Rolling Over hdfs file to " + scheduledhdfsFileName);
+        LogLog.debug("Rolling Over hdfs file to " + scheduledHdfsFileName);
         
                 
         if ( hdfsAvailable ) {
-           rollOverHdfsFile(scheduledhdfsFileName);
-           actualhdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
+           // for hdfs file we don't rollover the fike, we rename the file.
+           actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
         }         
          
         try {
@@ -427,9 +397,7 @@ public class HdfsFileAppender extends FileAppender {
         } catch(IOException e) {
             errorHandler.error("setFile(" + fileName + ", false) call failed.");
         }
-        scheduledFilename = datedFilename;
-        scheduledhdfsFileName = datedhdfsFileName;
-      
+        scheduledHdfsFileName = datedhdfsFileName;
     }
     
     
@@ -438,14 +406,13 @@ public class HdfsFileAppender extends FileAppender {
      */
     private void rollOverLocal() throws IOException {
         /* Compute filename, but only if datePattern is specified */
-        if(FileRollingInterval == null) {
+        if(fileRollingInterval == null) {
             errorHandler.error("Missing LocalDatePattern option in rollOverLocal().");
             return;
         }
         
         long epochNow = System.currentTimeMillis();
        
-        //String datedCacheFileName = fileCache + "." + epochNow;
         String datedCacheFileName = fileCache+sdf.format(epochNow);
         LogLog.debug("In rollOverLocal() epochNow" + epochNow + "  " + "nextCheckLocal: "
+ prevnextCheckLocal );
         
@@ -473,7 +440,7 @@ public class HdfsFileAppender extends FileAppender {
 	        } else {
 	          LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+scheduledFileCache+"].");
 	        }
-	        setfileCacheWriter();
+	        setFileCacheWriter();
 	        scheduledFileCache =  datedCacheFileName;
         }
     }
@@ -496,7 +463,7 @@ public class HdfsFileAppender extends FileAppender {
         // trailing spaces in file names.
         String val = file.trim();
         
-        fileName = val;
+        fileName=val;
         fileCache=val+".cache";
         
       } 
@@ -549,7 +516,7 @@ public class HdfsFileAppender extends FileAppender {
         this.bufferSize = bufferSize;
         
         //set cache file
-        setfileCacheWriter();
+        setFileCacheWriter();
         
         writeHeader();
                 
@@ -594,15 +561,6 @@ public class HdfsFileAppender extends FileAppender {
     protected void subAppend(LoggingEvent event) {
     	LogLog.debug("Called subAppend for logging into hdfs...");   
        
-    	//String logPath = (String)event.getMDC(Log4jAuditProvider.MDC_VAR_LOG_RESOURCE_PATH)
;
-
-    	//LogLog.debug("Log Path " + logPath); 
-    
-    	/*
-    	if (logPath.startsWith(fileName)) {
-    		return ;
-    	}
-    	*/
         long n = System.currentTimeMillis();
         if(n >= nextCheck) {
             now.setTime(n);
@@ -625,7 +583,7 @@ public class HdfsFileAppender extends FileAppender {
         if ( nLocal > nextCheckLocal ) {
     	  nowLocal.setTime(nLocal);
     	  prevnextCheckLocal = nextCheckLocal;
-    	  nextCheckLocal = rcLocal.getNextCheckMillis(nowLocal, FileRollOffset);
+    	  nextCheckLocal = rcLocal.getNextCheckMillis(nowLocal, fileRollOffset);
     	   if ( firstTimeLocal) {
           	 prevnextCheckLocal = nextCheckLocal;
           	 firstTimeLocal = false;
@@ -653,7 +611,7 @@ public class HdfsFileAppender extends FileAppender {
       closeWriter();
       this.qw = null;
       //this.
-      this.closehdfsWriter();
+      this.closeHdfsWriter();
       this.closeCacheWriter();
    }
     
@@ -661,8 +619,8 @@ public class HdfsFileAppender extends FileAppender {
     public synchronized void close() {
     	LogLog.debug("Closing all resource..");
         this.closeFile();
-        this.closehdfsWriter();
-        this.closehdfsostream();
+        this.closeHdfsWriter();
+        this.closeHdfsOstream();
         this.closeFileSystem();
     }
 
@@ -676,8 +634,8 @@ public class HdfsFileAppender extends FileAppender {
         } catch(IOException ie) {
         	LogLog.error("unable to close output stream", ie);
         }
-        this.closehdfsWriter();
-        this.closehdfsostream();
+        this.closeHdfsWriter();
+        this.closeHdfsOstream();
      }
 
     @Override
@@ -701,9 +659,7 @@ public class HdfsFileAppender extends FileAppender {
   
  /******* HDFS Appender methods ***********/
      
- //private void appendHDFSFileSystem(String fileNameSystem, String filename, String fileCache,boolean
append, boolean bufferedIO, int bufferSize, Layout layout, String encoding, LoggingEvent event,
String scheduledFileCache, Writer cacheWriter,boolean hdfsUpdateAllowed ) {
-
-	private void appendHDFSFileSystem(LoggingEvent event) {
+ private void appendHDFSFileSystem(LoggingEvent event) {
 
 	long currentTime = System.currentTimeMillis();
 	
@@ -711,8 +667,8 @@ public class HdfsFileAppender extends FileAppender {
     	
     	 if ( currentTime >= hdfsNextCheck ) {
     		 
-    		LogLog.debug("About to Open fileSystem" + fileSystemName+" "+actualhdfsfileName) ;
-	      	hs = openHdfsSink(fileSystemName,actualhdfsfileName,fileCache,fileAppend,bufferedIO,bufferSize,layout,encoding,scheduledFileCache,cachewriter,hdfsUpdateAllowed,processUser);
+    		LogLog.debug("About to Open fileSystem" + fileSystemName+" "+actualHdfsfileName) ;
+	      	hs = openHdfsSink(fileSystemName,actualHdfsfileName,fileCache,fileAppend,bufferedIO,bufferSize,layout,encoding,scheduledFileCache,cacheWriter,hdfsUpdateAllowed,processUser);
 	      	if (hdfsUpdateAllowed) {
 	      	    // stream into hdfs only when  liveHdfsUpdate flag is true else write to cache
file.
 	      		hs.setOsteam();
@@ -742,9 +698,7 @@ public class HdfsFileAppender extends FileAppender {
  		     }
  		  	 writeToCache(event);
 	   }
-       
-       //hs.closehdfsWriter();
-       //hs.closehdfsostream();
+
     }
     
    
@@ -765,18 +719,18 @@ public class HdfsFileAppender extends FileAppender {
  	
     }
     
-    private void closehdfsostream() {
+    private void closeHdfsOstream() {
     	if (hs != null ){
     		LogLog.debug("Closing hdfs outstream") ;
-    		hs.closehdfsostream();
+    		hs.closeHdfsOstream();
        }
     }
     
-    private void closehdfsWriter() {
+    private void closeHdfsWriter() {
     	
     	if (hs != null) {
     	 LogLog.debug("Closing hdfs Writer") ;
-    	 hs.closehdfsWriter();
+    	 hs.closeHdfsWriter();
     	}
     }
     
@@ -784,36 +738,27 @@ public class HdfsFileAppender extends FileAppender {
     	hs.closeHdfsSink();
     }
     
-    private void rollOverHdfsFile(String scheduleHdfsFileName){
-    	try {
-    	 // Rollover hdfs file.
-    	 hs.rollOverHdfsFile(scheduleHdfsFileName);
-    	} catch (IOException ie) {
-    	 // TODO..what needs to happend if rollover of hdfs file fails.
-    	 LogLog.debug("Rollover of hdfs failed..") ;
-     	}
-    }
-    
+   
     
     /****** Cache File Methods **/
     
     
-    public void setfileCacheWriter()  {
+    public void setFileCacheWriter()  {
      
      try {
-   		  setfileCacheOstream(fileCache);
+    	 setFileCacheOstream(fileCache);
    	 } catch(IOException ie) {
    		 LogLog.error("Logging failed while tring to write into Cache File..", ie);
    	 }
 	 LogLog.debug("Setting Cache Writer..");
-	 cachewriter = createCacheFileWriter(cacheOstream);
+	 cacheWriter = createCacheFileWriter(cacheOstream);
 	 if(bufferedIO) {
-	    cachewriter = new BufferedWriter(cachewriter, bufferSize);
+		 cacheWriter = new BufferedWriter(cacheWriter, bufferSize);
 	  }		  
 	}
     
     
-    private  void setfileCacheOstream(String fileCache) throws IOException {
+    private  void setFileCacheOstream(String fileCache) throws IOException {
     
      try {
           cacheOstream = new FileOutputStream(fileCache, true);
@@ -854,10 +799,10 @@ public class HdfsFileAppender extends FileAppender {
    public void writeToCache(LoggingEvent event) {
 	 
 	   try {  
-		     LogLog.debug("Writing log to Cache.." + "layout: "+ this.layout.format(event) + "ignoresThowable:
"+layout.ignoresThrowable() + "Writer:" + cachewriter.toString());
+		     LogLog.debug("Writing log to Cache.." + "layout: "+ this.layout.format(event) + "ignoresThowable:
"+layout.ignoresThrowable() + "Writer:" + cacheWriter.toString());
 		    		     
-		     cachewriter.write(this.layout.format(event));
-		     cachewriter.flush();
+		     cacheWriter.write(this.layout.format(event));
+		     cacheWriter.flush();
 		     
 		     if(layout.ignoresThrowable()) {
 		      	 String[] s = event.getThrowableStrRep();
@@ -865,9 +810,9 @@ public class HdfsFileAppender extends FileAppender {
 			  	   int len = s.length;
 			   	   for(int i = 0; i < len; i++) {
 				  	  LogLog.debug("Log:" + s[i]);
-				      cachewriter.write(s[i]);
-				      cachewriter.write(Layout.LINE_SEP);
-				      cachewriter.flush();
+				  	  cacheWriter.write(s[i]);
+				  	  cacheWriter.write(Layout.LINE_SEP);
+				  	  cacheWriter.flush();
 				  }
 		       }
 		     }
@@ -882,7 +827,6 @@ public class HdfsFileAppender extends FileAppender {
 		  
 		  long epochNow = System.currentTimeMillis();
 	       
-	      //String datedCacheFileName = fileCache + "." + epochNow;
 	      String datedCacheFileName = fileCache + "." + epochNow;             
 		  LogLog.debug("Rolling over remaining cache File to new file"+ datedCacheFileName);
 		  closeCacheWriter();
@@ -906,9 +850,9 @@ public class HdfsFileAppender extends FileAppender {
     
    public void closeCacheWriter() {
 		  try {
-	            if(cachewriter != null) {
-	            	cachewriter.close();
-	            	cachewriter = null;
+	            if(cacheWriter != null) {
+	            	cacheWriter.close();
+	            	cacheWriter = null;
 	            }
 	        } catch(IOException ie) {
 	        	 LogLog.error("unable to close cache writer", ie);
@@ -1006,6 +950,10 @@ class RollingCalendar extends GregorianCalendar {
 
 class HdfsSink {
 
+	  private static final String DS_REPLICATION_VAL = "1";
+	  private static final String DS_REPLICATION_KEY = "dfs.replication";
+	  private static final String FS_DEFAULT_NAME_KEY = "ds.default.name";
+	  
 	  private Configuration conf = null;
 	  private FileSystem fs= null;
       private Path pt = null;
@@ -1021,7 +969,6 @@ class HdfsSink {
 	  private static int fstime=0;
 	  private CacheFileWatcher cfw = null;
 	  private boolean hdfsUpdateAllowed=true;
-	  private String scheduledCacheFile=null;
 	  private String processUser=null;
 
 	  
@@ -1048,14 +995,15 @@ class HdfsSink {
 		   this.bufferedIO=bufferedIO;
 		   this.fileCache=fileCache;
 		   this.hdfsUpdateAllowed=hdfsUpdateAllowed;
-		   this.scheduledCacheFile=scheduledCacheFile;
 		   this.processUser=processUser;
 		  
            try {
-        	   conf.set("dfs.replication", "1");
+        	   
+        	   conf.set(DS_REPLICATION_KEY,DS_REPLICATION_VAL);
+        	   conf.set(FS_DEFAULT_NAME_KEY, this.fsName);
         	   
         	   if ( fs == null) {
-        		 LogLog.debug("Opening Connection to hdfs Sytem");
+        		 LogLog.debug("Opening Connection to hdfs Sytem" + this.fsName);
         		         		        		 
         		 UserGroupInformation ugi = UserGroupInformation.createProxyUser(this.processUser,
UserGroupInformation.getLoginUser());
     		     ugi.doAs( new PrivilegedExceptionAction<Void>() {
@@ -1070,7 +1018,7 @@ class HdfsSink {
               	      	   
       	          LogLog.debug("About to run CacheFilWatcher...");
       	          Path hdfsfilePath = getParent();
-      	          cfw = new CacheFileWatcher(this.fs,this.fileCache,hdfsfilePath,this.scheduledCacheFile,cacheWriter,this.hdfsUpdateAllowed,conf);
+      	          cfw = new CacheFileWatcher(this.fs,this.fileCache,hdfsfilePath,cacheWriter,this.hdfsUpdateAllowed,conf);
       	          cfw.start();
                  }
         	     
@@ -1103,7 +1051,7 @@ class HdfsSink {
 	  
 	  public void setOsteam() throws IOException {
 	    try {
-	    	  pt = new Path(fileName);
+	    	  pt = new Path(this.fileName);
 	    	  // if file Exist append it
 	    	  if(fs.exists(pt)) {
 	        	  LogLog.debug("Appending File: "+ this.fsName+":"+this.fileName);  
@@ -1113,9 +1061,8 @@ class HdfsSink {
 		      	  hdfsostream=fs.append(pt);	
 		      	  
 	           } else {
-	        	           	   
-	        	   LogLog.debug("Creating File directories in hdfs if not present...");  
-		           String parentName = new Path(fileName).getParent().toString();
+	        	   LogLog.debug("Creating File directories in hdfs if not present.."+ this.fsName+":"+this.fileName);
 
+		           String parentName = new Path(this.fileName).getParent().toString();
 		           if(parentName != null) {
 		              Path parentDir = new Path(parentName);
 		              if (!fs.exists(parentDir) ) {
@@ -1132,12 +1079,11 @@ class HdfsSink {
 	 }
 	 	  
 	  public void setWriter() {
-
 		  LogLog.debug("Setting Writer..");
 		  hdfswriter = createhdfsWriter(hdfsostream);
 		  if(bufferedIO) {
 			  hdfswriter = new BufferedWriter(hdfswriter, bufferSize);
-		   }		  
+		  }		  
 	  }
 	  
 	  public Writer getWriter() {
@@ -1181,7 +1127,7 @@ class HdfsSink {
 		 } catch (IOException ie) {
              LogLog.error("Unable to log header message to hdfs:", ie); 
              throw ie;
-          }
+         }
 	   }
 	   
 	   public
@@ -1203,7 +1149,7 @@ class HdfsSink {
 	   
 	   }
 
-	   public void closehdfsostream() {
+	   public void closeHdfsOstream() {
 	        try {
 	            if(this.hdfsostream != null) {
 	                this.hdfsostream.close();
@@ -1215,7 +1161,7 @@ class HdfsSink {
 	        
 	     }
 	   
-	   public void closehdfsWriter() {
+	   public void closeHdfsWriter() {
 		  try {
 	            if(hdfswriter != null) {
 	            	hdfswriter.close();
@@ -1254,53 +1200,33 @@ class HdfsSink {
 		    return retval;
 		   }	  
 	 	  
-	  public void rollOverHdfsFile(String scheduledHdfsFilename) throws IOException {
-		  
-		 /* try {
-			  
-			   Path target = new Path(scheduledHdfsFilename);
-	           if(this.fs.exists(target)) {
-	              this.fs.delete(target, true);
-	           }
 	
-	           Path file = new Path(this.fileName);
-	           boolean result = this.fs.rename(file, target);
-	           if(result) {
-	          	 LogLog.debug(fileName + " -> " + scheduledHdfsFilename);
-	           } else {
-	          	 LogLog.error("Failed to rename [" + fileName + "] to [" + scheduledHdfsFilename
+ "].");
-	           }
-           
-		  }catch(IOException ie) {
-			  
-		     LogLog.error("Unable to rollover hdfs file ",ie);
-			 throw ie;
-		 } */
-	  }
   }
 
 
 // CacheFileWatcher Thread 
 
 class CacheFileWatcher extends Thread {
+	
+	long CACHEFILE_WATCHER_SLEEP_TIME = 1000*60*2;
+	
 	Configuration conf = null;
-	FileSystem fs = null;
-	String cacheFile = null;
-	File parentDir = null;
-	File[] files = null;
-	Path fsPath = null;
-	Path hdfsfilePath = null;
-	Writer cacheWriter = null;
-	String scheduledFileCache = null;
-	boolean hdfsUpdateAllowed=true;
-	boolean cacheFilesCopied = false;
+	private FileSystem fs = null;
+	private String cacheFile = null;
+	private File parentDir = null;
+	private File[] files = null;
+	private Path fsPath = null;
+	private Path hdfsfilePath = null;
+	private Writer cacheWriter = null;
+
+	private boolean hdfsUpdateAllowed=true;
+	private boolean cacheFilesCopied = false;
 	
-	CacheFileWatcher(FileSystem fs, String cacheFile, Path hdfsfilePath, String scheduledFileCache,
Writer cacheWriter, boolean hdfsUpdateAllowed, Configuration conf) {
+	CacheFileWatcher(FileSystem fs, String cacheFile, Path hdfsfilePath, Writer cacheWriter,
boolean hdfsUpdateAllowed, Configuration conf) {
 		this.fs = fs;
 		this.cacheFile = cacheFile;
 		this.conf = conf;
 		this.hdfsfilePath = hdfsfilePath;
-		this.scheduledFileCache = scheduledFileCache;
 		this.cacheWriter = cacheWriter;
 		this.hdfsUpdateAllowed = hdfsUpdateAllowed;
 	}
@@ -1310,7 +1236,6 @@ class CacheFileWatcher extends Thread {
 		
 	LogLog.debug("CacheFileWatcher Started");		
 	    
-	    
 		while (!cacheFilesCopied ){
 			
 			if (hdfsUpdateAllowed) {
@@ -1318,10 +1243,9 @@ class CacheFileWatcher extends Thread {
 			}
 			
 		  	if ( !cacheFilePresent(cacheFile) ) {
-		  		 
-		  		
+	  		
 				try {
-					Thread.sleep(1000*60*2);
+					Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME);
 				} catch (InterruptedException ie) {
 					LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie);
 				}
@@ -1339,7 +1263,7 @@ class CacheFileWatcher extends Thread {
 			    		LogLog. error("Error while copying Cache Files to hdfs..Sleeping for next try",t);
	
 			    		
 				    	try {
-				    		Thread.sleep(1000*60*2);
+				    		Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME);
 				    	} catch (InterruptedException ie) {
 				    		LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie);
 			    	}
@@ -1398,8 +1322,7 @@ class CacheFileWatcher extends Thread {
 	  if (new File(cacheFile).length() != 0 ) {
 		  long epochNow = System.currentTimeMillis();
 	       
-	      //String datedCacheFileName = cacheFile + "." + epochNow;
-		  String datedCacheFileName = cacheFile + sdf.format(epochNow);
+	      String datedCacheFileName = cacheFile + sdf.format(epochNow);
 	      
 		  LogLog.debug("Rolling over remaining cache File "+ datedCacheFileName);
 		  closeCacheFile();
@@ -1423,14 +1346,14 @@ class CacheFileWatcher extends Thread {
   }
 	
    public void closeCacheFile() {
-		  try {
-	            if(cacheWriter != null) {
-	            	cacheWriter.close();
-	            	cacheWriter = null;
-	            }
-	        } catch(IOException ie) {
-	        	 LogLog.error("unable to close cache writer", ie);
-	        }
+	  try {
+            if(cacheWriter != null) {
+            	cacheWriter.close();
+            	cacheWriter = null;
+            }
+        } catch(IOException ie) {
+        	 LogLog.error("unable to close cache writer", ie);
+        }
 	}
 }
 


Mime
View raw message