incubator-chukwa-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From IvyTang <>
Subject Change the FileTailingAdaptor tailFile(),let it apply to the log4j rotated log files
Date Tue, 10 Jul 2012 08:04:26 GMT
Our team has used chukwa *CharFileTailingAdaptorUTF8* to collect the log4j
rotated log files for several months.It does help us to collect the logs
from everywhere to our hadoop center.
During the work , we met several problems . And i have raised them in this
mail list , but i still haven't got a good solution.
So we  read the source code , and did some changes

Our log files are generated by the log4j ,and the log4j appender is
If you use log4j to generate the rotated log ,may this mail will help you.

These two problems are the causes why we have to modify the source code.

1. The mismatching checkpoint size and file size.

     I raised this problem in May 14 ,"the check point offset is bigger
than the log file size". And Ariel Rabkin  and Eric have answered my
question , thanks for your replies.

     When chukwa starts, it will read the the check point file , let the
size be the filereadoffset. The size in the checkpoint indicates how many
bytes the adaptor has send .

     If the log source is stream or a file won't rotate , this size is
right ,it indeed is the filereadoffset.But the file is rorated , the
checkpoint size is often bigger than the file size ,and this will cause
chukwa resend all the log file.

     So we add a ""chunk seqID:"+c.getSeqID());" in

     *for (Chunk c : toSend) {
      DataOutputBuffer b = new
      try {
      } catch (IOException err) {
        log.error("serialization threw IOException", err);
      // store a CLE for this chunk which we will use to ack this chunk to
      // caller of send()
      // (e.g. the agent will use the list of CLE's for checkpointing)"chunk seqID:"+c.getSeqID());
      commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(),
         c.getSeqID() - c.getData().length));
    **The seqid is the offset of the send chunks in this log file.**
   * So when we need to restart the chukwa, we just need to stop the chukwa
, change the size in checkpoint to the last chunk seqid in log and start
      We also can directly apply the seqID to checkpoint size ,but we don't
know if this will cause other problems.

*2.* *The method tailFile in FileTailingAdaptor is the core code of
collecting the log. The code use the fileReadOffset , file length to detect
the rotated file.
        *RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");*
*        len = reader.length();*
*        long newLength = newReader.length();*
*        if (newLength < len && fileReadOffset >= len) {*
*          if (reader != null) {*
*            reader.close();*
*          }*
*          *
*          reader = newReader;*
*          fileReadOffset = 0L;*
*          log.debug("Adaptor|"+ adaptorID + "| File size mismatched,
rotating: "*
*              + toWatch.getAbsolutePath());*
*        } else {*
*          try {*
*            if (newReader != null) {*
*              newReader.close();*
*            }*
*            newReader =null;*
*          } catch (Throwable e) {*
*            // do nothing.*
*          }*
*        }*
*     *This arithmetic does work in most cases. But there is a case ,that
when chukwa starts , the log file is 0 and it will be 0 untill it has been
rotated. After it has been rotated ,becase its size is 0 ,this log will be
removed. A new file has generated , and its size isn't 0.
      But the len is still 0 ,newLength is > 0.So this contition  if
(newLength < len && fileReadOffset >= len)  will never be archived. The new
log file will never be detected.

        So we changed the implemention of this method, we use timestamp to
detect the new log file.The lastSlurpTime is the timestamp of the last
slurp ,it is been declared and assigned in LWFTAdaptor .
       try {
                len = reader.length();
                if(lastSlurpTime == 0){
                    lastSlurpTime = System.currentTimeMillis();
                if (offsetOfFirstByte > fileReadOffset) {
                    // If the file rotated, the recorded offsetOfFirstByte
is greater than
                    // file size,reset the first byte position to beginning
of the file.
                    fileReadOffset = 0;
                    offsetOfFirstByte = 0L;
                    log.warn("offsetOfFirstByte>fileReadOffset, resetting
offset to 0");
                if (len == fileReadOffset) {
                    File fixedNameFile = new
                    long fixedNameLastModified =
                    if (fixedNameLastModified > lastSlurpTime) {
                        // If len == fileReadOffset,the file stops rolling
log or the file has rotated.
                        // But fixedNameLastModified > lastSlurpTime , this
means after the last slurping,the file has been written .
                        // so the file has been rotated.
                        boolean hasLeftData = true;
                        while(hasLeftData){// read the possiblly generated
                            hasLeftData = slurp(len, reader);
                        RandomAccessFile newReader = new
RandomAccessFile(toWatch, "r");
                        if (reader != null) {
                        reader = newReader;
                        fileReadOffset = 0L;
                        len = reader.length();
                        log.debug("Adaptor|" + adaptorID + "| File size
mismatched, rotating: " +
                    hasMoreData = slurp(len, reader);
                } else if (len < fileReadOffset) {
                    // file has rotated and no detection
                    if (reader != null) {
                    reader = null;
                    fileReadOffset = 0L;
                    offsetOfFirstByte = 0L;
                    hasMoreData = true;
                    log.warn("Adaptor|" + adaptorID + "| file: " +
                            + ", has rotated and no detection - reset
counters to 0L");
                } else {
                    hasMoreData = slurp(len, reader);

We hope these two changes will help the adaptor collect the rotated file
more well.

If these is anything wrong ,please let me know,


Best regards,

Ivy Tang

View raw message