chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r762139 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
Date Sun, 05 Apr 2009 19:38:18 GMT
Author: asrabkin
Date: Sun Apr  5 19:38:17 2009
New Revision: 762139

URL: http://svn.apache.org/viewvc?rev=762139&view=rev
Log:
CHUKWA-70.  Rewrite FileAdaptor.  (contributed by Jerome Boulon)

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=762139&r1=762138&r2=762139&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Sun Apr  5 19:38:17 2009
@@ -67,6 +67,8 @@
 
   BUG FIXES
 
+    CHUKWA-70.  Rewrite FileAdaptor.  (Jerome Boulon via asrabkin)
+
     CHUKWA-93.  Fix NPE in SeqFileWriter.  (Jiaqi Tan via asrabkin)
 
     CHUKWA-1. Remove lzo job configuration from Chukwa data processors. (Contribute by Jerome
Boulon via Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=762139&r1=762138&r2=762139&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
Sun Apr  5 19:38:17 2009
@@ -18,47 +18,137 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
+
+
+class FileAdaptorTailer extends Thread {
+  static Logger log = Logger.getLogger(FileAdaptorTailer.class);
+  private List<FileAdaptor> adaptors = null;
+  private static Configuration conf = null;
+  private Object lock = new Object();
+  
+  /**
+   * How often to call each adaptor.
+   */
+  int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 10;
+  int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
+
+  
+  public FileAdaptorTailer() {
+    
+    if (conf == null) {
+      ChukwaAgent agent = ChukwaAgent.getAgent();
+      if (agent != null) {
+        conf = agent.getConfiguration();
+        if (conf != null) {
+          SAMPLE_PERIOD_MS = conf.getInt(
+              "chukwaAgent.adaptor.context.switch.time",
+              DEFAULT_SAMPLE_PERIOD_MS);
+        }
+      }
+    }
+    
+    // iterations are much more common than adding a new adaptor
+    adaptors = new CopyOnWriteArrayList<FileAdaptor>();
+
+    this.setDaemon(true);
+    start();// start the FileAdaptorTailer thread
+  }
+  @Override
+  public void run() {
+    while(true) {
+      try {
+
+        while (adaptors.size() == 0) {
+          synchronized (lock) {
+            try {
+              log.info("Waiting queue is empty");
+              lock.wait();
+            } catch (InterruptedException e) {
+              // do nothing
+            }
+          }
+        }
+        
+        long startTime = System.currentTimeMillis();
+        for (FileAdaptor adaptor: adaptors) {
+          log.info("calling this adaptor:" + adaptor.getStreamName());
+          adaptor.sendFile(); 
+        }
+        
+        long timeToReadFiles = System.currentTimeMillis() - startTime;
+        if (timeToReadFiles < SAMPLE_PERIOD_MS) {
+          Thread.sleep(SAMPLE_PERIOD_MS);
+        }
+        
+      }catch (Throwable e) {
+        log.warn("Exception in FileAdaptorTailer:",e);
+      }
+    }
+  }
+  
+  public void addFileAdaptor(FileAdaptor adaptor) {
+    adaptors.add(adaptor);
+    synchronized (lock) {
+      lock.notifyAll();
+    }
+  }
+  
+  public void removeFileAdaptor(FileAdaptor adaptor) {
+    adaptors.remove(adaptor);
+  }
+}
 
 /**
  * File Adaptor push small size file in one chunk to collector
  */
 public class FileAdaptor implements Adaptor {
 
-  static Logger log;
-
-  protected static Configuration conf = null;
-  private int attempts = 0;
-
-  File toWatch;
-  /**
-   * next PHYSICAL offset to read
-   */
+  static Logger log = Logger.getLogger(FileAdaptor.class);
+  static FileAdaptorTailer tailer = null;
+  
+  static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000;
+  static int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
+  
+  static {
+    tailer = new FileAdaptorTailer();
+    ChukwaAgent agent = ChukwaAgent.getAgent();
+    if (agent != null) {
+      Configuration conf = agent.getConfiguration();
+      if (conf != null) {
+        TIMEOUT_PERIOD = conf.getInt(
+            "chukwaAgent.adaptor.fileadaptor.timeoutperiod",
+            DEFAULT_TIMEOUT_PERIOD);
+      }
+    }
+  }
+  
+  private long startTime = 0;
+  private long timeOut = 0;
+  
+  
+  protected File toWatch;
+  protected RandomAccessFile reader = null;
   protected long fileReadOffset;
   protected String type;
-  private ChunkReceiver dest;
-  protected RandomAccessFile reader = null;
+  protected ChunkReceiver dest;
   protected long adaptorID;
-
+  protected boolean shutdownCalled = false;
+  
   /**
    * The logical offset of the first byte of the file
    */
   private long offsetOfFirstByte = 0;
 
-  static {
-    log = Logger.getLogger(FileAdaptor.class);
-  }
-
   public void start(long adaptorID, String type, String params, long bytes,
       ChunkReceiver dest) {
     // in this case params = filename
@@ -67,7 +157,9 @@
     this.adaptorID = adaptorID;
     this.type = type;
     this.dest = dest;
-    this.attempts = 0;
+    this.startTime = System.currentTimeMillis();
+    this.timeOut = startTime + TIMEOUT_PERIOD;
+    
 
     String[] words = params.split(" ");
     if (words.length > 1) {
@@ -76,32 +168,87 @@
     } else {
       toWatch = new File(params);
     }
-    try {
-      reader = new RandomAccessFile(toWatch, "r");
-      long bufSize = toWatch.length();
-      byte[] buf = new byte[(int) bufSize];
-      reader.read(buf);
-      long fileTime = toWatch.lastModified();
-      int bytesUsed = extractRecords(dest, 0, buf, fileTime);
-    } catch (Exception e) {
-      e.printStackTrace();
+    
+    tailer.addFileAdaptor(this);
+  }
+
+  void sendFile() {
+    long now = System.currentTimeMillis() ;
+    long oneMinAgo = now - (60*1000);
+    if (toWatch.exists()) {
+     if (toWatch.lastModified() > oneMinAgo && now < timeOut) {
+       log.info("Last modified time less than one minute, keep waiting");
+       return;
+     } else {
+       try {
+         
+         long bufSize = toWatch.length();
+         byte[] buf = new byte[(int) bufSize];
+         
+         reader = new RandomAccessFile(toWatch, "r");
+         reader.read(buf);
+         reader.close();
+         reader = null;
+         
+         long fileTime = toWatch.lastModified();
+         int bytesUsed = extractRecords(dest, 0, buf, fileTime);
+         this.fileReadOffset = bytesUsed;
+         unregisterFromAgent();
+         cleanUp();
+       }catch(Exception e) {
+         log.warn("Exception while trying to read: " + toWatch.getAbsolutePath(),e);
+       }
+       finally {
+         if (reader != null) {
+           try {
+             reader.close();
+           } catch (Exception e) {
+            // do nothing
+          }
+           reader = null;
+         }
+       }
+     }
+    } else {
+      if (now > timeOut) {
+        log.warn("Couldn't read this file: " + toWatch.getAbsolutePath());
+        unregisterFromAgent();
+        cleanUp() ;
+      }
     }
+  }
+  
+  private void cleanUp() {
+    tailer.removeFileAdaptor(this);
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (Exception e) {
+       // do nothing
+     }
+      reader = null;
+    } 
+  }
+  
+  private void unregisterFromAgent() {
     ChukwaAgent agent = ChukwaAgent.getAgent();
     if (agent != null) {
       agent.stopAdaptor(adaptorID, false);
+      
     } else {
-      log.info("Agent is null, running in default mode");
+      log.warn("Agent is null, cannot unregister " + toWatch.getAbsolutePath());
     }
-    this.fileReadOffset = bytes;
+ 
   }
-
+  
+  
   /**
-   * Do one last tail, and then stop
+   * We want to keep trying
    * 
    * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
    */
   public long shutdown() throws AdaptorException {
-    hardStop();
+    // do nothing -- will be automatically done by TimeOut
     return fileReadOffset + offsetOfFirstByte;
   }
 
@@ -109,6 +256,7 @@
    * Stop tailing the file, effective immediately.
    */
   public void hardStop() throws AdaptorException {
+    cleanUp();
   }
 
   public String getStreamName() {
@@ -118,19 +266,24 @@
   /**
    * Extract records from a byte sequence
    * 
-   * @param eq the queue to stick the new chunk[s] in
-   * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
-   * @param buf the byte buffer to extract records from
+   * @param eq
+   *          the queue to stick the new chunk[s] in
+   * @param buffOffsetInFile
+   *          the byte offset in the stream at which buf[] begins
+   * @param buf
+   *          the byte buffer to extract records from
    * @return the number of bytes processed
    * @throws InterruptedException
    */
-  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+  protected int extractRecords(final ChunkReceiver eq, long buffOffsetInFile,
       byte[] buf, long fileTime) throws InterruptedException {
-    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+    final ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
         buffOffsetInFile + buf.length, buf, this);
     String tags = chunk.getTags();
     chunk.setTags(tags + " time=\"" + fileTime + "\"");
+    log.info("Adding " + toWatch.getAbsolutePath() + " to the queue");
     eq.add(chunk);
+    log.info( toWatch.getAbsolutePath() + " added to the queue");
     return buf.length;
   }
 
@@ -145,4 +298,4 @@
         + " " + fileReadOffset;
   }
 
-}
\ No newline at end of file
+}



Mime
View raw message