chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r783879 - in /hadoop/chukwa/branches/chukwa-0.2: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/java/org/apache/hadoop...
Date Thu, 11 Jun 2009 19:18:09 GMT
Author: asrabkin
Date: Thu Jun 11 19:18:08 2009
New Revision: 783879

URL: http://svn.apache.org/viewvc?rev=783879&view=rev
Log:
CHUKWA-194. Backfilling tools.  Contributed by Jerome Boulon.

Added:
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java
    hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/tools/
    hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/tools/backfilling/
    hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java
Modified:
    hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/ChunkImpl.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
    hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java

Modified: hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt (original)
+++ hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt Thu Jun 11 19:18:08 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-194. Backfilling tools. (Jerome Boulon via asrabkin)
+
     CHUKWA-253. Added aggregations by user. (Cheng Zhang via Eric Yang)
 
     CHUKWA-95. Added Web Service API to export data from database. (Terence Kwan via Eric Yang)

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/ChunkImpl.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/ChunkImpl.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/ChunkImpl.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/ChunkImpl.java Thu Jun 11 19:18:08 2009
@@ -44,13 +44,27 @@
   private transient Adaptor initiator;
   long seqID;
 
-  ChunkImpl() {
+  private static String localHostAddr;
+  static {
+    try {
+      setHostAddress(InetAddress.getLocalHost().getHostName());
+    } catch (UnknownHostException e) {
+      setHostAddress("localhost");
+    }
   }
-
+  
+  public static void setHostAddress(String host) {
+    ChunkImpl.localHostAddr = host;
+  }
+  
+  
   public static ChunkImpl getBlankChunk() {
     return new ChunkImpl();
   }
 
+  ChunkImpl() {
+  }
+
   public ChunkImpl(String dataType, String streamName, long seq, byte[] data,
                    Adaptor source) {
     this.seqID = seq;
@@ -237,14 +251,7 @@
     return source + ":" + application + ":" + new String(data) + "/" + seqID;
   }
 
-  private static String localHostAddr;
-  static {
-    try {
-      localHostAddr = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException e) {
-      localHostAddr = "localhost";
-    }
-  }
+
 
   /**
    * @see org.apache.hadoop.chukwa.Chunk#getSerializedSizeEstimate()

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Thu Jun 11 19:18:08 2009
@@ -72,14 +72,25 @@
    * Signals this adaptor to come to an orderly stop. The adaptor ought to push
    * out all the data it can before exiting.
    * 
-   * This method is synchronous: In other words, after shutdown() returns, no
-   * new data should be written.
+   * This method is synchronous up to 60 seconds
    * 
    * @return the logical offset at which the adaptor stops
    * @throws AdaptorException
    */
+  @Deprecated
   public long shutdown() throws AdaptorException;
 
+  
+  /**
+   * Signals this adaptor to come to an orderly stop. The adaptor ought to push
+   * out all the data it can before exiting depending of the shutdown policy
+   * 
+   * @return the logical offset at which the adaptor was when the method return
+   * @throws AdaptorException
+   */
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy) throws AdaptorException;
+  
+  
   /**
    * Signals this adaptor to come to an abrupt stop, as quickly as it can. The
    * use case here is "Whups, I didn't mean to start that adaptor tailing a
@@ -94,6 +105,7 @@
    * 
    * @throws AdaptorException
    */
+  @Deprecated
   public void hardStop() throws AdaptorException;
 
 }
\ No newline at end of file

Added: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java?rev=783879&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java (added)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java Thu Jun 11 19:18:08 2009
@@ -0,0 +1,5 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+public enum AdaptorShutdownPolicy {
+  HARD_STOP,GRACEFULLY,WAIT_TILL_FINISHED;
+}

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java Thu Jun 11 19:18:08 2009
@@ -39,7 +39,7 @@
  * 0
  * 
  */
-public class ExecAdaptor extends  AbstractAdaptor {
+public class ExecAdaptor extends AbstractAdaptor {
   
   static class EmbeddedExec extends ExecPlugin {
 
@@ -129,19 +129,48 @@
   }
 
   @Override
+  @Deprecated
+  /**
+  * use shutdown(AdaptorShutdownPolicy shutdownPolicy)
+  */
   public void hardStop() throws AdaptorException {
-    exec.stop();
-    timer.cancel();
+     shutdown(AdaptorShutdownPolicy.HARD_STOP);
   }
-
-  @Override
-  public long shutdown() throws AdaptorException {
-    try {
-      timer.cancel();
-      exec.waitFor(); // wait for last data to get pushed out
-    } catch (InterruptedException e) {
-      return sendOffset;
+  
+ @Override
+ @Deprecated
+ /**
+* use shutdown(AdaptorShutdownPolicy shutdownPolicy)
+*/
+    public long shutdown() throws AdaptorException {
+     return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
+   }
+ 
+   @Override
+   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+       throws AdaptorException {
+     log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+     switch(shutdownPolicy) {
+     case HARD_STOP :
+       timer.cancel();
+       exec.stop();
+       break;
+     case GRACEFULLY :
+       try {
+         timer.cancel();
+         exec.waitFor();
+       } catch (InterruptedException e) {
+       }
+       break;
+     case WAIT_TILL_FINISHED :
+       try {
+         timer.cancel();
+         exec.waitFor();
+       } catch (InterruptedException e) {
+      }
+      break;
     }
+    log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
     return sendOffset;
   }
 

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Thu Jun 11 19:18:08 2009
@@ -135,7 +135,7 @@
   private long startTime = 0;
   private long timeOut = 0;
   
-  
+  protected volatile boolean finished = false;
   protected File toWatch;
   protected RandomAccessFile reader = null;
   protected long fileReadOffset;
@@ -186,12 +186,12 @@
          long fileTime = toWatch.lastModified();
          int bytesUsed = extractRecords(dest, 0, buf, fileTime);
          this.fileReadOffset = bytesUsed;
+         finished = true;
          deregisterAndStop(false);
          cleanUp();
-       }catch(Exception e) {
+       } catch(Exception e) {
          log.warn("Exception while trying to read: " + toWatch.getAbsolutePath(),e);
-       }
-       finally {
+       }  finally {
          if (reader != null) {
            try {
              reader.close();
@@ -204,6 +204,7 @@
      }
     } else {
       if (now > timeOut) {
+        finished = true;
         log.warn("Couldn't read this file: " + toWatch.getAbsolutePath());
         deregisterAndStop(false);
         cleanUp() ;
@@ -229,18 +230,59 @@
    * 
    * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
    */
+  @Deprecated
   public long shutdown() throws AdaptorException {
-    // do nothing -- will be automatically done by TimeOut
-    return fileReadOffset + offsetOfFirstByte;
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
   }
 
   /**
    * Stop tailing the file, effective immediately.
    */
+  @Deprecated
   public void hardStop() throws AdaptorException {
-    cleanUp();
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
   }
 
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
+    log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+    switch(shutdownPolicy) {
+      case HARD_STOP :
+        cleanUp();
+        break;
+      case GRACEFULLY : {
+        int retry = 0;
+        while (!finished && retry < 60) {
+          try {
+            log.info("GRACEFULLY Retry:" + retry);
+            Thread.sleep(1000);
+            retry++;
+          } catch (InterruptedException ex) {
+          }
+        } 
+      }
+      break;
+      case WAIT_TILL_FINISHED : {
+        int retry = 0;
+        while (!finished) {
+          try {
+            if (retry%100 == 0) {
+              log.info("WAIT_TILL_FINISHED Retry:" + retry);
+            }
+
+            Thread.sleep(1000);
+            retry++;
+          } catch (InterruptedException ex) {
+          }
+        } 
+      }
+
+      break;
+    }
+    log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+    return fileReadOffset + offsetOfFirstByte;
+  }
+  
   public String getStreamName() {
     return toWatch.getPath();
   }

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Thu Jun 11 19:18:08 2009
@@ -98,44 +98,76 @@
    * 
    * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
    */
+  @Deprecated
   public long shutdown() throws AdaptorException {
-    try {
-      if (toWatch.exists()) {
-        int retry = 0;
-        tailer.stopWatchingFile(this);
-        TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
-        lastTail.setDaemon(true);
-        lastTail.start();
-        while (lastTail.isAlive() && retry < 60) {
-          try {
-            log.info("Retry:" + retry);
-            Thread.currentThread().sleep(1000);
-            retry++;
-          } catch (InterruptedException ex) {
-          }
-        }
-      }
-    } finally {
-      return fileReadOffset + offsetOfFirstByte;
-    }
-
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
   }
 
   /**
    * Stop tailing the file, effective immediately.
    */
+  @Deprecated
   public void hardStop() throws AdaptorException {
-    tailer.stopWatchingFile(this);
-    try {
-      if (reader != null) {
-        reader.close();
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
+  }
+
+  
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
+    
+    log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
+    
+    switch(shutdownPolicy) {
+      case HARD_STOP :
+        tailer.stopWatchingFile(this);
+        try {
+          if (reader != null) {
+            reader.close();
+          }
+          reader = null;
+        } catch(Throwable e) {
+         log.warn("Exception while closing reader:",e);
+        }
+        break;
+      case GRACEFULLY : 
+      case WAIT_TILL_FINISHED :{
+        if (toWatch.exists()) {
+          int retry = 0;
+          tailer.stopWatchingFile(this);
+          TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
+          lastTail.setDaemon(true);
+          lastTail.start();
+          
+          if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
+            while (lastTail.isAlive() && retry < 60) {
+              try {
+                log.info("GRACEFULLY Retry:" + retry);
+                Thread.sleep(1000);
+                retry++;
+              } catch (InterruptedException ex) {
+              }
+            }
+          } else {
+            while (lastTail.isAlive()) {
+              try {
+                if (retry%100 == 0) {
+                  log.info("WAIT_TILL_FINISHED Retry:" + retry);
+                }
+                Thread.sleep(1000);
+                retry++;
+              } catch (InterruptedException ex) {
+              }
+            } 
+          }          
+        }
       }
-      reader = null;
-    } catch(Throwable e) {
-      // do nothing
+      break;
     }
+    log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+    return fileReadOffset + offsetOfFirstByte;
   }
-
+  
+  
   /**
    * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
    */
@@ -264,9 +296,11 @@
                   + MAX_READ_SIZE);
             } else {
               log.info("Conf is null, running in default mode");
+              conf = new Configuration();
             }
           } else {
             log.info("Agent is null, running in default mode");
+            conf = new Configuration();
           }
         }
 

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java Thu Jun 11 19:18:08 2009
@@ -36,8 +36,7 @@
           endTime = System.currentTimeMillis() + (10 * 60 * 1000); // now + 10
                                                                    // mins
           if (count > 3) {
-            log
-                .warn("TerminatorThread should have been finished by now, stopping it now! count="
+            log.warn("TerminatorThread should have been finished by now, stopping it now! count="
                     + count);
             break;
           }
@@ -47,8 +46,7 @@
       log.info("InterruptedException on Terminator thread:"
           + adaptor.toWatch.getPath(), e);
     } catch (Throwable e) {
-      log
-          .warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),
+      log.warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),
               e);
     }
 

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java Thu Jun 11 19:18:08 2009
@@ -36,7 +36,7 @@
    * @param className the name of the {@link Adaptor} class to instantiate
    * @return an Adaptor of the specified type
    */
-  static Adaptor createAdaptor(String className) {
+  static public Adaptor createAdaptor(String className) {
     Object obj = null;
     try {
       // the following reflection business for type checking is probably

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java Thu Jun 11 19:18:08 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.datacollection.agent;
 
+import java.util.Collections;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
@@ -27,6 +28,7 @@
  */
 public interface AdaptorManager {
   
+
   Configuration getConfiguration();
   int adaptorCount();
   long stopAdaptor(long number, boolean gracefully);
@@ -34,4 +36,37 @@
   long processAddCommand(String cmd);
   Map<Long, String> getAdaptorList();
 
+  static AdaptorManager NULL = new AdaptorManager() {
+
+    @Override
+    public int adaptorCount() {
+      return 0;
+    }
+
+    @Override
+    public Adaptor getAdaptor(long id) {
+      return null;
+    }
+
+    @Override
+    public Map<Long, String> getAdaptorList() {
+      return Collections.emptyMap();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return new Configuration();
+    }
+
+    @Override
+    public long processAddCommand(String cmd) {
+      return 0;
+    }
+
+    @Override
+    public long stopAdaptor(long number, boolean gracefully) {
+      return 0;
+    }
+  };
+  
 }

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Thu Jun 11 19:18:08 2009
@@ -247,6 +247,7 @@
 
   public void tryToBind() throws IOException {
     s = new ServerSocket(portno);
+    s.setReuseAddress(true);
     portno = s.getLocalPort();
     if (s.isBound())
       log.info("socket bound to " + s.getLocalPort());

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Thu Jun 11 19:18:08 2009
@@ -54,7 +54,8 @@
  */
 public class ChukwaAgent implements AdaptorManager {
   // boolean WRITE_CHECKPOINTS = true;
-  static final AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "chukwaAgent");
+  static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "chukwaAgent");
+
   static Logger log = Logger.getLogger(ChukwaAgent.class);
   static ChukwaAgent agent = null;
 

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java Thu Jun 11 19:18:08 2009
@@ -4,6 +4,8 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.chukwa.Chunk;
+
 public class RecordUtil {
   static Pattern clusterPattern = Pattern
       .compile("(.*)?cluster=\"(.*?)\"(.*)?");
@@ -19,4 +21,15 @@
 
     return "undefined";
   }
+  public static String getClusterName(Chunk chunk) {
+    String tags = chunk.getTags();
+    if (tags != null) {
+      Matcher matcher = clusterPattern.matcher(tags);
+      if (matcher.matches()) {
+        return matcher.group(2);
+      }
+    }
+
+    return "undefined";
+  }
 }

Added: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java?rev=783879&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java (added)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java Thu Jun 11 19:18:08 2009
@@ -0,0 +1,90 @@
+package org.apache.hadoop.chukwa.tools.backfilling;
+
+import java.io.File;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+public class BackfillingLoader {
+  static Logger log = Logger.getLogger(BackfillingLoader.class);
+  
+  protected Configuration conf = null;
+  protected ChunkQueue queue = null;
+  protected Connector connector = null;
+  
+  private String cluster =  null;
+  private String machine =  null;
+  private String adaptorName =  null;
+  private String recordType =  null;
+  private String logFile =  null;
+  
+  public BackfillingLoader(Configuration conf, String cluster, String machine, 
+      String adaptorName, String recordType, String logFile) {
+    
+    this.conf = conf;
+    this.cluster = cluster.trim();
+    this.machine = machine.trim();
+    this.adaptorName = adaptorName;
+    this.recordType = recordType;
+    this.logFile = logFile;
+    
+    log.info("cluster >>>" + cluster) ;
+    log.info("machine >>>" + machine) ;
+    log.info("adaptorName >>>" + adaptorName) ;
+    log.info("recordType >>>" + recordType) ;
+    log.info("logFile >>>" + logFile) ;
+    
+    // Set the right cluster and machine information
+    DataFactory.getInstance().addDefaultTag("cluster=\"" + this.cluster + "\"");
+    ChunkImpl.setHostAddress(this.machine);
+    
+    queue = DataFactory.getInstance().getEventQueue();
+    connector = new QueueToWriterConnector(conf,true);
+  }
+  
+  public void process() throws AdaptorException {
+    File file = new File(logFile);
+    connector.start();
+    Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
+    adaptor.start(System.currentTimeMillis(), recordType, "0 " +file.getAbsolutePath()  ,
+        0l,queue, AdaptorManager.NULL );
+    adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
+    connector.shutdown();
+    file.renameTo(new File(logFile + ".sav"));
+  }
+  
+  public static void usage() {
+    System.out.println("java org.apache.hadoop.chukwa.tools.backfilling.BackfillingLoader <cluster> <machine> <adaptorName> <recordType> <logFile>");
+    System.exit(-1);
+  }
+  
+  /**
+   * @param args
+   * @throws Exception 
+   */
+  public static void main(String[] args) throws Exception {
+
+    if (args.length != 5) {
+      usage();
+    }
+    
+
+    String cluster = args[0];
+    String machine = args[1];
+    String adaptorName = args[2];
+    String recordType = args[3];
+    String logFile = args[4];
+
+    BackfillingLoader loader = new BackfillingLoader(new ChukwaConfiguration(),cluster,machine,adaptorName,recordType,logFile);
+    loader.process();
+  }
+
+}

Added: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java?rev=783879&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java (added)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java Thu Jun 11 19:18:08 2009
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.tools.backfilling;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+public class QueueToWriterConnector implements Connector, Runnable {
+  static Logger log = Logger.getLogger(QueueToWriterConnector.class);
+  static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
+
+  protected Configuration conf = null;
+  protected volatile boolean isRunning = true;
+  protected ChunkQueue chunkQueue = DataFactory.getInstance().getEventQueue();
+  protected ChukwaWriter writer = null;
+  protected Thread runner = null;
+  protected boolean isBackfilling = false;
+  public QueueToWriterConnector(Configuration conf,boolean isBackfilling) {
+    this.conf = conf;
+    this.isBackfilling = isBackfilling;
+  }
+
+  @Override
+  public void reloadConfiguration() {
+    // do nothing here
+  }
+
+  @Override
+  public void shutdown() {
+    isRunning = false;
+    
+    log.info("Shutdown in progress ...");
+    while (isAlive()) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+    }
+   
+    try {
+      if (writer != null) {
+        writer.close();
+      }
+    } catch(Exception e) {
+      log.warn("Exception while closing writer: ", e);
+    }
+    log.info("Shutdown done.");
+  }
+
+  @Override
+  public void start() {
+    log.info("Starting QueueToWriterConnector thread");
+    runner = new Thread(this, "QueueToWriterConnectorThread");
+    runner.start();
+  }
+
+  protected boolean isAlive() {
+    return this.runner.isAlive();
+  }
+  
+  @Override
+  public void run() {
+
+    log.info("initializing QueueToWriterConnector");
+    try {
+      String writerClassName = conf.get("chukwaCollector.writerClass",
+          SeqFileWriter.class.getCanonicalName());
+      Class<?> writerClass = Class.forName(writerClassName);
+      if (writerClass != null
+          && ChukwaWriter.class.isAssignableFrom(writerClass)) {
+        writer = (ChukwaWriter) writerClass.newInstance();
+      } else {
+        throw new RuntimeException("Wrong class type");
+      }
+      writer.init(conf);
+
+    } catch (Throwable e) {
+      log.warn("failed to use user-chosen writer class, Bail out!", e);
+      DaemonWatcher.bailout(-1);
+    }
+
+    
+    List<Chunk> chunks = new LinkedList<Chunk>();
+    ChukwaAgent agent = null;// ChukwaAgent.getAgent();
+    
+    log.info("processing data for QueueToWriterConnector");
+    
+    while ( isRunning ||  chunkQueue.size() != 0 || chunks.size() != 0) {
+      try {
+        if (chunks.size() == 0) {
+          
+          if (isBackfilling && chunkQueue.size() == 0) {
+            Thread.sleep(300);
+            continue;
+          }
+          chunkQueue.collect(chunks, MAX_SIZE_PER_POST);
+          log.info("Got " + chunks.size() + " chunks back from the queue");
+        }       
+        
+        writer.add(chunks);
+        
+        if (agent != null) {
+          for(Chunk chunk: chunks) {
+            agent.reportCommit(chunk.getInitiator(), chunk.getSeqID());
+          }
+        }
+        
+        chunks.clear();
+        
+      }
+      catch (Throwable e) {
+        log.warn("Could not save some chunks");
+        e.printStackTrace();
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException e1) {}
+      } 
+    }
+  }
+
+}

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Thu Jun 11 19:18:08 2009
@@ -22,8 +22,8 @@
 import java.util.Random;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.*;
-import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
-import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
 
 public class ConstRateAdaptor extends Thread implements Adaptor {
@@ -89,13 +89,14 @@
     return "const rate " + type;
   }
 
+  @Deprecated
   public void hardStop() throws AdaptorException {
-    stopping = true;
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
   }
 
+  @Deprecated
   public long shutdown() throws AdaptorException {
-    stopping = true;
-    return offset;
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
   }
 
   @Override
@@ -103,4 +104,17 @@
     return type;
   }
 
+
+    @Override
+    public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
+      
+      switch(shutdownPolicy) {
+        case HARD_STOP :
+        case GRACEFULLY : 
+        case WAIT_TILL_FINISHED :
+          stopping = true;
+        break;
+      }
+      return offset;
+    }
 }

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Thu Jun 11 19:18:08 2009
@@ -23,8 +23,7 @@
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.*;
 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
-import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
-import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
 
 public class MaxRateSender extends Thread implements Adaptor {
 
@@ -77,15 +76,27 @@
   }
 
   public long shutdown() throws AdaptorException {
-    stopping = true;
-    return offset;
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
   }
 
   public void hardStop() throws AdaptorException {
-    stopping = true;
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
   }
 
   @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
+    
+    switch(shutdownPolicy) {
+      case HARD_STOP :
+      case GRACEFULLY : 
+      case WAIT_TILL_FINISHED :
+        stopping = true;
+      break;
+    }
+    return offset;
+  }
+  
+  @Override
   public String getType() {
     return type;
   }

Modified: hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java?rev=783879&r1=783878&r2=783879&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java (original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java Thu Jun 11 19:18:08 2009
@@ -75,4 +75,11 @@
   }
 
 
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
 }

Added: hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java?rev=783879&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java (added)
+++ hadoop/chukwa/branches/chukwa-0.2/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java Thu Jun 11 19:18:08 2009
@@ -0,0 +1,334 @@
+package org.apache.hadoop.chukwa.tools.backfilling;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
+import org.apache.hadoop.chukwa.validationframework.util.MD5;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+
+public class TestBackfillingLoader extends TestCase{
+
+  public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscaped() {
+    String tmpDir = System.getProperty("test.build.data", "/tmp");
+    long ts = System.currentTimeMillis();
+    String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
+
+    Configuration conf = new Configuration();
+    conf.set("writer.hdfs.filesystem", "file:///");
+    conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
+    conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
+    
+    String cluster = "MyCluster_" + ts;
+    String machine = "machine_" + ts;
+    String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
+    String recordType = "MyRecordType_" + ts;
+    
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      
+      File in1Dir = new File(dataDir + "/input");
+      in1Dir.mkdirs();
+      int lineCount = 107;
+      File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
+      long size = inputFile.length();
+      
+      String logFile = inputFile.getAbsolutePath();
+      System.out.println("Output:" + logFile);
+      System.out.println("File:" + inputFile.length());
+      BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
+      loader.process();
+      
+      File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
+      
+      Assert.assertTrue(inputFile.exists() == false);
+      Assert.assertTrue(finalOutputFile.exists() == true);
+      
+      String doneFile = null;
+      File directory = new File(dataDir  + "/log/");
+      String[] files = directory.list();
+      for(String file: files) {
+        if ( file.endsWith(".done") ){
+          doneFile = dataDir  + "/log/" + file;
+          break;
+        }
+      }
+      
+      long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
+          cluster, recordType,  machine, logFile);
+      Assert.assertTrue(seqId == size);
+      
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void testBackfillingLoaderWithFileAdaptor() {
+    String tmpDir = System.getProperty("test.build.data", "/tmp");
+    long ts = System.currentTimeMillis();
+    String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
+
+    Configuration conf = new Configuration();
+    conf.set("writer.hdfs.filesystem", "file:///");
+    conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
+    conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
+    
+    String cluster = "MyCluster_" + ts;
+    String machine = "machine_" + ts;
+    String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor";
+    String recordType = "MyRecordType_" + ts;
+    
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      
+      File in1Dir = new File(dataDir + "/input");
+      in1Dir.mkdirs();
+      int lineCount = 118;
+      File inputFile = makeTestFile(dataDir + "/input/in2.txt",lineCount);
+      long size = inputFile.length();
+      
+      String logFile = inputFile.getAbsolutePath();
+      System.out.println("Output:" + logFile);
+      System.out.println("File:" + inputFile.length());
+      BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
+      loader.process();
+      
+      File finalOutputFile = new File(dataDir + "/input/in2.txt.sav");
+      
+      Assert.assertTrue(inputFile.exists() == false);
+      Assert.assertTrue(finalOutputFile.exists() == true);
+      
+      String doneFile = null;
+      File directory = new File(dataDir  + "/log/");
+      String[] files = directory.list();
+      for(String file: files) {
+        if ( file.endsWith(".done") ){
+          doneFile = dataDir  + "/log/" + file;
+          break;
+        }
+      }
+      
+     long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
+          cluster, recordType,  machine, logFile);
+      Assert.assertTrue(seqId == size);
+      
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  
+  
+  public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFile() {
+    String tmpDir = System.getProperty("test.build.data", "/tmp");
+    long ts = System.currentTimeMillis();
+    String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
+
+    Configuration conf = new Configuration();
+    conf.set("writer.hdfs.filesystem", "file:///");
+    conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
+    conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
+    
+    
+    String cluster = "MyCluster_" + ts;
+    String machine = "machine_" + ts;
+    String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
+    String recordType = "MyRecordType_" + ts;
+    
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      
+      File in1Dir = new File(dataDir + "/input");
+      in1Dir.mkdirs();
+      int lineCount = 1024*1024;//34MB
+      File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
+      long size = inputFile.length();
+      
+      String logFile = inputFile.getAbsolutePath();
+      System.out.println("Output:" + logFile);
+      System.out.println("File:" + inputFile.length());
+      BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
+      loader.process();
+      
+      File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
+      
+      Assert.assertTrue(inputFile.exists() == false);
+      Assert.assertTrue(finalOutputFile.exists() == true);
+      
+      String doneFile = null;
+      File directory = new File(dataDir  + "/log/");
+      String[] files = directory.list();
+      for(String file: files) {
+        if ( file.endsWith(".done") ){
+          doneFile = dataDir  + "/log/" + file;
+          break;
+        }
+      }
+      
+      long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
+          cluster, recordType,  machine, logFile);
+     
+      Assert.assertTrue(seqId == size);
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  
+  public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFileLocalWriter() {
+    String tmpDir = System.getProperty("test.build.data", "/tmp");
+    long ts = System.currentTimeMillis();
+    String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
+
+    Configuration conf = new Configuration();
+    conf.set("writer.hdfs.filesystem", "file:///");
+    conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
+    conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
+    conf.set("chukwaCollector.localOutputDir", dataDir  + "/log/");
+    conf.set("chukwaCollector.writerClass", "org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter");
+    
+    String cluster = "MyCluster_" + ts;
+    String machine = "machine_" + ts;
+    String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
+    String recordType = "MyRecordType_" + ts;
+    
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      
+      File in1Dir = new File(dataDir + "/input");
+      in1Dir.mkdirs();
+      int lineCount = 1024*1024*2;//64MB
+      File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
+      long size = inputFile.length();
+      
+      String logFile = inputFile.getAbsolutePath();
+      System.out.println("Output:" + logFile);
+      System.out.println("File:" + inputFile.length());
+      BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
+      loader.process();
+      
+      File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
+      
+      Assert.assertTrue(inputFile.exists() == false);
+      Assert.assertTrue(finalOutputFile.exists() == true);
+      
+      String doneFile = null;
+      File directory = new File(dataDir  + "/log/");
+      String[] files = directory.list();
+      for(String file: files) {
+        if ( file.endsWith(".done") ){
+          doneFile = dataDir  + "/log/" + file;
+          break;
+        }
+      }
+      
+      long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
+          cluster, recordType,  machine, logFile);
+     
+      Assert.assertTrue(seqId == size);
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+  protected long validateDataSink(FileSystem fs,Configuration conf, String dataSinkFile, File logFile, 
+      String cluster,String dataType, String source, String application) throws Throwable {
+    SequenceFile.Reader reader = null;
+    long lastSeqId = -1;
+    BufferedWriter out = null;
+    try {
+      
+      reader = new SequenceFile.Reader(fs, new Path(dataSinkFile), conf);
+      ChukwaArchiveKey key = new ChukwaArchiveKey();
+      ChunkImpl chunk = ChunkImpl.getBlankChunk();
+
+      String dataSinkDumpName = dataSinkFile + ".dump";
+      out = new BufferedWriter(new FileWriter(dataSinkDumpName));
+      
+
+
+      while (reader.next(key, chunk)) {
+        Assert.assertTrue(cluster.equals(RecordUtil.getClusterName(chunk)));
+        Assert.assertTrue(dataType.equals(chunk.getDataType()));
+        Assert.assertTrue(source.equals(chunk.getSource()));
+        
+        out.write(new String(chunk.getData()));
+        lastSeqId = chunk.getSeqID() ;
+      }
+      
+      out.close();
+      out = null;
+      reader.close();
+      reader = null;
+      
+      String dataSinkMD5 = MD5.checksum(new File(dataSinkDumpName));
+      String logFileMD5 = MD5.checksum(logFile);
+      Assert.assertTrue(dataSinkMD5.equals(logFileMD5));
+    }
+    finally {
+      if (out != null) {
+        out.close();
+      }
+      
+      if (reader != null) {
+        reader.close();
+      }
+    }
+   
+    
+    return lastSeqId;
+  }
+  
+  private File makeTestFile(String name, int size) throws IOException {
+    File tmpOutput = new File(name);
+    
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+    PrintWriter pw = new PrintWriter(fos);
+    for (int i = 0; i < size; ++i) {
+      pw.print(i + " ");
+      pw.println("abcdefghijklmnopqrstuvwxyz");
+    }
+    pw.flush();
+    pw.close();
+    return tmpOutput;
+  }
+  
+}



Mime
View raw message