chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r1361902 - in /incubator/chukwa/trunk: ./ src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/main/java/org/apache/hadoop/chukwa/datacollection/sender/
Date Mon, 16 Jul 2012 04:11:43 GMT
Author: eyang
Date: Mon Jul 16 04:11:43 2012
New Revision: 1361902

URL: http://svn.apache.org/viewvc?rev=1361902&view=rev
Log:
CHUKWA-646. Fix file rotation correctly for 0 byte rotated files. (Ivy Tang via Eric Yang)

Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1361902&r1=1361901&r2=1361902&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Mon Jul 16 04:11:43 2012
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
 
   BUGS
 
+    CHUKWA-646. Fix file rotation correctly for 0 byte rotated files. (Ivy Tang via Eric
Yang)
+
     CHUKWA-643. Updated Jersey dependency. (Prakhar Srivastava via Eric Yang)
 
     CHUKWA-641. Fix stack trace for dumpChunk command when invalid regular expression is
specified. (Eric Spishak via Eric Yang)

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=1361902&r1=1361901&r2=1361902&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
Mon Jul 16 04:11:43 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.chukwa.datacol
 
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.io.File;
 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 
@@ -168,55 +169,59 @@ public class FileTailingAdaptor extends 
 
       long len = 0L;
       try {
-        RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
         len = reader.length();
-        long newLength = newReader.length();
-        if (newLength < len && fileReadOffset >= len) {
-          if (reader != null) {
-            reader.close();
-          }
-          
-          reader = newReader;
-          fileReadOffset = 0L;
-          log.debug("Adaptor|"+ adaptorID + "| File size mismatched, rotating: "
-              + toWatch.getAbsolutePath());
-        } else {
-          try {
-            if (newReader != null) {
-              newReader.close();
-            }
-            newReader =null;
-          } catch (Throwable e) {
-            log.debug(ExceptionUtil.getStackTrace(e));
-          }
+        if (lastSlurpTime == 0) {
+          lastSlurpTime = System.currentTimeMillis();
         }
-      } catch (IOException e) {
-        log.debug(ExceptionUtil.getStackTrace(e));
-      }
-      if (len >= fileReadOffset) {
         if (offsetOfFirstByte > fileReadOffset) {
           // If the file rotated, the recorded offsetOfFirstByte is greater than
-          // file size,
-          // reset the first byte position to beginning of the file.
+          // file size,reset the first byte position to beginning of the file.
           fileReadOffset = 0;
           offsetOfFirstByte = 0L;
           log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
         }
-        hasMoreData = slurp(len, reader);
-
-      } else {
-        // file has rotated and no detection
-        if (reader != null) {
-          reader.close();
-        }
-        
-        reader = null;
-        fileReadOffset = 0L;
-        offsetOfFirstByte = 0L;
-        hasMoreData = true;
-        log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
-            + ", has rotated and no detection - reset counters to 0L");
-      }
+        if (len == fileReadOffset) {
+          File fixedNameFile = new File(toWatch.getAbsolutePath());
+          long fixedNameLastModified = fixedNameFile.lastModified();
+          if (fixedNameLastModified > lastSlurpTime) {
+            // If len == fileReadOffset,the file stops rolling log or the file
+            // has rotated.
+            // But fixedNameLastModified > lastSlurpTime , this means after the
+            // last slurping,the file has been written ,
+            // so the file has been rotated.
+            boolean hasLeftData = true;
+            while (hasLeftData) {// read the possiblly generated log
+              hasLeftData = slurp(len, reader);
+            }
+            RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
+            if (reader != null) {
+              reader.close();
+            }
+            reader = newReader;
+            fileReadOffset = 0L;
+            len = reader.length();
+            log.debug("Adaptor|" + adaptorID 
+                + "| File size mismatched, rotating: " 
+                + toWatch.getAbsolutePath());
+          }
+          hasMoreData = slurp(len, reader);
+        } else if (len < fileReadOffset) {
+          // file has rotated and no detection
+          if (reader != null) {
+            reader.close();
+          }
+          reader = null;
+          fileReadOffset = 0L;
+          offsetOfFirstByte = 0L;
+          hasMoreData = true;
+          log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath() 
+              + ", has rotated and no detection - reset counters to 0L");
+        } else {
+          hasMoreData = slurp(len, reader);
+        }
+      } catch (IOException e) {
+        // do nothing, if file doesn't exist.
+      }       
     } catch (IOException e) {
       log.warn("failure reading " + toWatch, e);
     }

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=1361902&r1=1361901&r2=1361902&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
Mon Jul 16 04:11:43 2012
@@ -72,7 +72,11 @@ public class LWFTAdaptor extends Abstrac
    */
   protected long offsetOfFirstByte = 0;
   protected Configuration conf = null;
-  
+  /**
+   * The timestamp of last slurping.
+   */ 
+  protected long lastSlurpTime = 0l;
+
   File toWatch;
 
   @Override

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=1361902&r1=1361901&r2=1361902&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
Mon Jul 16 04:11:43 2012
@@ -188,6 +188,7 @@ public class ChukwaHttpSender implements
       // store a CLE for this chunk which we will use to ack this chunk to the
       // caller of send()
       // (e.g. the agent will use the list of CLE's for checkpointing)
+      log.info("chunk seqID:"+c.getSeqID());
       commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(), 
          c.getSeqID() - c.getData().length));
     }



Mime
View raw message