chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r811958 - in /hadoop/chukwa/trunk: ./ contrib/chukwa-pig/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache/hadoop/chukwa/datacollection/collector/ src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet...
Date Mon, 07 Sep 2009 00:55:54 GMT
Author: asrabkin
Date: Mon Sep  7 00:55:53 2009
New Revision: 811958

URL: http://svn.apache.org/viewvc?rev=811958&view=rev
Log:
CHUKWA-369. Tolerance of collector failures.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Sep  7 00:55:53 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-369. Tolerance of collector failures. (asrabkin)
+
     CHUKWA-368. New data integrity validation tool. (asrabkin)
 
     CHUKWA-383. Added embed mode for HICC.  (Eric Yang)

Modified: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java Mon Sep  7 00:55:53 2009
@@ -45,8 +45,9 @@
    *  
    * @param src the adaptor in question
    * @param uuid the number to record as checkpoint.  Must be monotonically increasing.
+   * @return the adaptor ID of the associated adaptor, or null if not running.
    */
-  public void reportCommit(Adaptor src, long uuid);
+  public String reportCommit(Adaptor src, long uuid);
 
   static AdaptorManager NULL = new AdaptorManager() {
 
@@ -81,7 +82,8 @@
     }
     
     @Override
-    public void reportCommit(Adaptor a, long l) {
+    public String reportCommit(Adaptor a, long l) {
+      return null;
     }
   };
   

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java?rev=811958&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java Mon Sep  7 00:55:53 2009
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.log4j.Logger;
+
+public class AdaptorResetThread extends Thread {
+  
+  Logger log = Logger.getLogger(AdaptorResetThread.class);
+  
+  int resetCount = 0;
+  private static class AdaptorStat {
+    long lastCommitTime = 0;
+    long maxByteSent = 0 ;
+    public AdaptorStat(long lastCommit, long maxByte) {
+      maxByteSent = maxByte;
+      lastCommitTime = lastCommit;
+    }
+  }
+
+  Map<Adaptor, AdaptorStat> status;
+  int timeout = 10*60 * 1000; //default to wait ten minutes for an ack
+  ChukwaAgent agent;
+  public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
+  private volatile boolean running = true;
+  
+  public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
+    timeout = 2* conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2);
+      //default to 2x rotation interval, if rotate interval is defined.
+    timeout = conf.getInt(TIMEOUT_OPT, timeout);
+      //or explicitly set timeout
+    status = new LinkedHashMap<Adaptor, AdaptorStat>();
+    this.agent = a;
+    this.setDaemon(true);
+  }
+  
+  public void resetTimedOutAdaptors(int timeSinceLastCommit) {
+    
+    long timeoutThresh = System.currentTimeMillis() - timeSinceLastCommit;
+    List<Adaptor> toResetList = new ArrayList<Adaptor>(); //also contains stopped 
+    //adaptors
+    synchronized(this) {
+      for(Map.Entry<Adaptor, AdaptorStat> ent: status.entrySet()) {
+        AdaptorStat stat = ent.getValue();
+        ChukwaAgent.Offset off = agent.offset(ent.getKey());
+        if(off == null) {
+          toResetList.add(ent.getKey());
+        } else if(stat.maxByteSent > off.offset 
+            && stat.lastCommitTime < timeoutThresh) {
+          toResetList.add(ent.getKey());
+          log.warn("restarting " + off.id + " at " + off.offset + " due to collector timeout");
+        }
+      }
+    }
+    
+    for(Adaptor a: toResetList) {
+      status.remove(a);
+      ChukwaAgent.Offset off = agent.offset(a);
+      if(off != null) {
+        agent.stopAdaptor(off.id, false);
+        
+          //We can do this safely if we're called in the same thread as the sends,
+        //since then we'll be synchronous with sends, and guaranteed to be
+        //interleaved between two successive sends
+        //DataFactory.getInstance().getEventQueue().purgeAdaptor(a);
+        
+        String a_status = a.getCurrentStatus();
+        agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()
+             + " "+ a_status + " " + off.offset);
+        resetCount ++;
+        //will be implicitly added to table once adaptor starts sending
+      } 
+       //implicitly do nothing if adaptor was stopped
+    }
+  }
+  
+  public synchronized void reportPending(List<AsyncAckSender.CommitListEntry> delayedCommits) {
+    for(AsyncAckSender.CommitListEntry dc: delayedCommits) {
+      AdaptorStat a = status.get(dc.adaptor);
+      if(a == null)
+        status.put(dc.adaptor, new AdaptorStat(0, dc.uuid));
+      else if(a.maxByteSent < dc.uuid)
+          a.maxByteSent = dc.uuid;
+    }
+  }
+  
+  public void reportStop(Adaptor a) {
+    status.remove(a);
+  }
+  
+  public void run() {
+    try {
+      while(running) {
+        Thread.sleep(timeout/2);
+        resetTimedOutAdaptors(timeout);
+      }
+    } catch(InterruptedException e) {}
+  } 
+  
+  public int getResetCount() {
+    return resetCount;
+  }
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Mon Sep  7 00:55:53 2009
@@ -70,14 +70,14 @@
   Connector connector = null;
 
   // doesn't need an equals(), comparator, etc
-  private static class Offset {
+  static class Offset {
     public Offset(long l, String id) {
       offset = l;
       this.id = id;
     }
 
-    private final String id;
-    private volatile long offset;
+    final String id;
+    volatile long offset;
   }
 
   public static class AlreadyRunningException extends Exception {
@@ -458,7 +458,7 @@
     }
   }
 
-  public void reportCommit(Adaptor src, long uuid) {
+  public String reportCommit(Adaptor src, long uuid) {
     needNewCheckpoint = true;
     Offset o = adaptorPositions.get(src);
     if (o != null) {
@@ -467,12 +467,13 @@
         if (uuid > o.offset)
           o.offset = uuid;
       }
-
-      log.info("got commit up to " + uuid + " on " + src + " = " + o.id);
+      log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
+      return o.id;
     } else {
       log.warn("got commit up to " + uuid + "  for adaptor " + src
           + " that doesn't appear to be running: " + adaptorCount()
           + " total");
+      return null;
     }
   }
 
@@ -570,6 +571,11 @@
     }
   }
 
+  Offset offset(Adaptor a) {
+    Offset o = adaptorPositions.get(a);
+    return o;
+  }
+  
   Connector getConnector() {
     return connector;
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Mon Sep  7 00:55:53 2009
@@ -22,7 +22,9 @@
 import org.mortbay.jetty.*;
 import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.jetty.servlet.*;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.writer.*;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -31,7 +33,7 @@
 
 public class CollectorStub {
 
-  static int THREADS = 80;
+  static int THREADS = 120;
   public static Server jettyServer = null;
 
   public static void main(String[] args) {
@@ -49,12 +51,12 @@
 
       ChukwaConfiguration conf = new ChukwaConfiguration();
       int portNum = conf.getInt("chukwaCollector.http.port", 9999);
-      THREADS = conf.getInt("chukwaCollector.http.threads", 80);
+      THREADS = conf.getInt("chukwaCollector.http.threads", THREADS);
 
       // pick a writer.
       ChukwaWriter w = null;
       Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>();
-      
+      ServletCollector servletCollector = new ServletCollector(conf);
       for(String arg: args) {
         if(arg.startsWith("writer=")) {       //custom writer class
           String writerCmd = arg.substring("writer=".length());
@@ -62,7 +64,7 @@
             boolean verbose = !writerCmd.equals("pretend-quietly");
             w = new ConsoleWriter(verbose);
             w.init(conf);
-            ServletCollector.setWriter(w);
+            servletCollector.setWriter(w);
           } else 
             conf.set("chukwaCollector.writerClass", writerCmd);
         } else if(arg.startsWith("servlet=")) {     //adding custom servlet
@@ -100,7 +102,11 @@
       
       // Add the collector servlet to server
       Context root = new Context(jettyServer, "/", Context.SESSIONS);
-      root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
+      root.addServlet(new ServletHolder(servletCollector), "/*");
+      
+      if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
+        root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
+
       root.setAllowNullPathInfo(false);
 
       // Add in any user-specified servlets

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java?rev=811958&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java Mon Sep  7 00:55:53 2009
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.log4j.Logger;
+import java.util.*;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class CommitCheckServlet extends HttpServlet {
+
+  private static final long serialVersionUID = -4627538252371890849L;
+  
+  protected static Logger log = Logger.getLogger(CommitCheckServlet.class);
+  CommitCheckThread commitCheck;
+  Configuration conf;
+    //interval at which to scan the filesystem, ms
+  public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod";
+  
+    //interval at which to discard seen files, ms
+  public static final String PURGEDELAY_OPT = "chukwaCollector.asyncAcks.purgedelay"; 
+    
+  //list of dirs to search, separated by commas
+  public static final String SCANPATHS_OPT = "chukwaCollector.asyncAcks.scanpaths";
+    
+  public static final String DEFAULT_PATH = "acks"; //path to this servlet on collector
+  public CommitCheckServlet(Configuration conf) {
+    this.conf = conf;
+  }
+  
+  public void init(ServletConfig servletConf) throws ServletException {
+    log.info("initing commit check servlet");
+    try {
+      FileSystem fs = FileSystem.get(
+          new URI(conf.get("writer.hdfs.filesystem", "file:///")), conf);
+      log.info("commitcheck fs is " + fs.getUri());
+      commitCheck = new CommitCheckThread(conf, fs);
+      commitCheck.start();
+    } catch(Exception e) {
+      log.error("couldn't start CommitCheckServlet", e);
+      throw new ServletException(e);
+    }
+  }
+  
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException  {
+  
+    PrintStream out = new PrintStream(resp.getOutputStream());
+    resp.setStatus(200);
+
+    out.println("<html><body><h2>Commit status</h2><ul>");
+    for(String s: commitCheck.getLengthList()) 
+      out.println("<li>" + s + "</li>");
+    out.println("</ul></body></html>");
+  }
+  
+
+  @Override
+  public void destroy() {
+    commitCheck.shutdown();
+  }
+  
+  /**
+   * Ideally, we'd use zookeeper to monitor archiver/demux rotation.
+   * For now, instead, we'll just do an ls in a bunch of places.
+   */
+  private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT {
+    int checkInterval = 1000 * 30;
+    volatile boolean running = true;
+    final Collection<Path> pathsToSearch;
+    final FileSystem fs;
+    final Map<String, Long> lengthTable;
+    final PriorityQueue<PurgeTask> oldEntries;
+    long delayUntilPurge = 1000 * 60 * 60 * 12;
+    
+    static class PurgeTask implements Comparable<PurgeTask>{
+      long purgeTime;
+      String toPurge;
+      long len;
+      
+      public PurgeTask(String s, long time, long len) {
+        this.toPurge = s;
+        this.purgeTime = time;
+        this.len = len;
+      }
+      
+      public int compareTo(PurgeTask p) {
+        if(purgeTime < p.purgeTime)
+          return -1;
+        else if (purgeTime == p.purgeTime)
+          return 0;
+        else
+          return 1;
+      }
+    }
+    
+    
+    public CommitCheckThread(Configuration conf, FileSystem fs) {
+      this.fs = fs;
+      pathsToSearch = new ArrayList<Path>();
+      lengthTable = new LinkedHashMap<String, Long>();
+      oldEntries = new PriorityQueue<PurgeTask>();
+      checkInterval = conf.getInt(SCANPERIOD_OPT, checkInterval);
+      
+      String sinkPath = conf.get("chukwaCollector.outputDir", "/chukwa/logs");
+      pathsToSearch.add(new Path(sinkPath));
+      
+      String additionalSearchPaths = conf.get(SCANPATHS_OPT, "");
+      String[] paths = additionalSearchPaths.split(",");
+      for(String s: paths)
+        if(s.length() > 1) {
+          Path path = new Path(s);
+          if(!pathsToSearch.contains(path))
+            pathsToSearch.add(path);
+        }
+      
+      delayUntilPurge = conf.getLong(PURGEDELAY_OPT, delayUntilPurge);
+      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
+      String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
+      String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
+      pathsToSearch.add(new Path(archivesMRInputDir));
+      //TODO: set checkInterval using conf
+    }
+    
+    public void shutdown() {
+      running = false;
+      this.interrupt();
+    }
+    
+    public void run() {
+      while(running) {
+        try {
+          Thread.sleep(checkInterval);
+          scanFS();
+          purgeOldEntries();
+        } catch(InterruptedException e) {}
+          catch(IOException e) {
+           log.error("io problem", e);
+        }
+      }
+   }
+
+    private synchronized void purgeOldEntries() {
+      long now = System.currentTimeMillis();
+      PurgeTask p = oldEntries.peek();
+      while(p != null && p.purgeTime < now) {
+        oldEntries.remove();
+        Long curLen = lengthTable.get(p.toPurge);
+        if(curLen != null && p.len >= curLen)
+          lengthTable.remove(p.toPurge);
+      }
+      
+    }
+
+    private void scanFS() throws IOException {
+      long nextPurgeTime = System.currentTimeMillis() + delayUntilPurge;
+      for(Path dir: pathsToSearch) {
+        int filesSeen = 0;
+        
+        FileStatus[] dataSinkFiles = fs.listStatus(dir, SinkArchiver.DATA_SINK_FILTER);
+        if(dataSinkFiles == null || dataSinkFiles.length == 0)
+          continue;
+        
+        synchronized(this) {
+          for(FileStatus fstatus: dataSinkFiles) {
+            filesSeen++;
+            String name = fstatus.getPath().getName();
+            long len = fstatus.getLen();
+            oldEntries.add(new PurgeTask(name, nextPurgeTime, len));
+            lengthTable.put(name, len);
+          }
+        }
+        log.info("scanning fs: " + dir + "; saw "+ filesSeen+ " files");
+      }
+    }
+
+    public synchronized List<String> getLengthList() {
+      ArrayList<String> list = new ArrayList<String>(lengthTable.size());
+      for(Map.Entry<String, Long> e: lengthTable.entrySet()) {
+        list.add(e.getKey() + " " + e.getValue());
+      }
+      return list;
+    }
+    
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Mon Sep  7 00:55:53 2009
@@ -39,21 +39,28 @@
 public class ServletCollector extends HttpServlet {
 
   static final boolean FANCY_DIAGNOSTICS = false;
-  static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
+  public static final String PATH = "chukwa";
+  /**
+   * If a chunk is committed; then the ack will start with the following string.
+   */
+  public static final String ACK_PREFIX = "ok: ";
+  org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
 
   private static final long serialVersionUID = 6286162898591407111L;
   Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
 
-  public static void setWriter(
-      org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w)
-      throws WriterException {
+  public void setWriter(ChukwaWriter w) {
     writer = w;
   }
+  
+  public ChukwaWriter getWriter() {
+    return writer;
+  }
 
-  static long statTime = 0L;
-  static int numberHTTPConnection = 0;
-  static int numberchunks = 0;
-  static long lifetimechunks = 0;
+  long statTime = 0L;
+  int numberHTTPConnection = 0;
+  int numberchunks = 0;
+  long lifetimechunks = 0;
 
   Configuration conf;
 
@@ -81,8 +88,7 @@
     }, (1000), (60 * 1000));
 
     if (writer != null) {
-      log
-          .info("writer set up statically, no need for Collector.init() to do it");
+      log.info("writer set up statically, no need for Collector.init() to do it");
       return;
     }
 
@@ -94,10 +100,7 @@
           && ChukwaWriter.class.isAssignableFrom(writerClass))
         writer = (ChukwaWriter) writerClass.newInstance();
     } catch (Exception e) {
-      log
-          .warn(
-              "failed to use user-chosen writer class, defaulting to SeqFileWriter",
-              e);
+      log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
     }
 
     // We default to here if the pipeline construction failed or didn't happen.
@@ -108,7 +111,7 @@
       
       writer.init(conf);
     } catch (Throwable e) {
-      log.warn("Exception during Servel init",e);
+      log.warn("Exception trying to initialize SeqFileWriter",e);
       DaemonWatcher.bailout(-1);
     }
   }
@@ -133,19 +136,12 @@
       }
 
       List<Chunk> events = new LinkedList<Chunk>();
-      ChunkImpl logEvent = null;
       StringBuilder sb = new StringBuilder();
 
       for (int i = 0; i < numEvents; i++) {
-        logEvent = ChunkImpl.read(di);
+        ChunkImpl logEvent = ChunkImpl.read(di);
         events.add(logEvent);
 
-        sb.append("ok:");
-        sb.append(logEvent.getData().length);
-        sb.append(" bytes ending at offset ");
-        sb.append(logEvent.getSeqID() - 1).append("\n");
-
-
         if (FANCY_DIAGNOSTICS) {
           diagnosticPage.sawChunk(logEvent, i);
         }
@@ -153,10 +149,23 @@
 
       // write new data to data sync file
       if (writer != null) {
-        writer.add(events);
+        ChukwaWriter.CommitStatus result = writer.add(events);
         numberchunks += events.size();
         lifetimechunks += events.size();
         // this is where we ACK this connection
+
+        if(result == ChukwaWriter.COMMIT_OK) {
+          for(Chunk receivedChunk: events) {
+            sb.append(ACK_PREFIX);
+            sb.append(receivedChunk.getData().length);
+            sb.append(" bytes ending at offset ");
+            sb.append(receivedChunk.getSeqID() - 1).append("\n");
+          }
+        } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
+          for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
+            sb.append(s);
+        }
+        
         l_out.print(sb.toString());
       } else {
         l_out.println("can't write: no writer");
@@ -192,12 +201,12 @@
 
     String pingAtt = req.getParameter("ping");
     if (pingAtt != null) {
-      out.println("Date:" + ServletCollector.statTime);
+      out.println("Date:" + statTime);
       out.println("Now:" + System.currentTimeMillis());
       out.println("numberHTTPConnection in time window:"
-          + ServletCollector.numberHTTPConnection);
-      out.println("numberchunks in time window:" + ServletCollector.numberchunks);
-      out.println("lifetimechunks:" + ServletCollector.lifetimechunks);
+          + numberHTTPConnection);
+      out.println("numberchunks in time window:" + numberchunks);
+      out.println("lifetimechunks:" + lifetimechunks);
     } else {
       out.println("<html><body><h2>Chukwa servlet running</h2>");
       if (FANCY_DIAGNOSTICS)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Mon Sep  7 00:55:53 2009
@@ -48,6 +48,7 @@
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
 import org.apache.hadoop.chukwa.datacollection.sender.*;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
 public class HttpConnector implements Connector, Runnable {
@@ -61,7 +62,9 @@
   int MIN_POST_INTERVAL = 5 * 1000;
   public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
   public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
+  public static final String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
 
+  boolean ASYNC_ACKS = false;
   
   ChunkQueue chunkQueue;
 
@@ -99,48 +102,57 @@
   public void start() {
 
     chunkQueue = DataFactory.getInstance().getEventQueue();
-    MAX_SIZE_PER_POST = agent.getConfiguration().getInt(MAX_SIZE_PER_POST_OPT,
-        MAX_SIZE_PER_POST);
-    MIN_POST_INTERVAL = agent.getConfiguration().getInt(MIN_POST_INTERVAL_OPT,
-        MIN_POST_INTERVAL);
+    Configuration conf = agent.getConfiguration();
+    MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
+    MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
+    ASYNC_ACKS = conf.getBoolean(ASYNC_ACKS_OPT, ASYNC_ACKS);
     (new Thread(this, "HTTP post thread")).start();
   }
 
   public void shutdown() {
     stopMe = true;
+    connectorClient.stop();
   }
 
   public void run() {
     log.info("HttpConnector started at time:" + System.currentTimeMillis());
 
-    Iterator<String> destinations = null;
-
     // build a list of our destinations from collectors
     try {
-      destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
+      if(collectors == null)
+        collectors = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
     } catch (IOException e) {
-      log.error("Failed to retreive list of collectors from "
+      log.error("Failed to retrieve list of collectors from "
           + "conf/collectors file", e);
     }
-
-    connectorClient = new ChukwaHttpSender(agent.getConfiguration());
+    
+    if(ASYNC_ACKS) {
+      try {
+        connectorClient = new AsyncAckSender(agent.getConfiguration(), agent);
+      } catch(IOException e) {
+        log.fatal("can't read AsycAck hostlist file, exiting");
+        agent.shutdown(true);
+      }
+    } else
+      connectorClient = new ChukwaHttpSender(agent.getConfiguration());
 
     if (argDestination != null) {
       ArrayList<String> tmp = new ArrayList<String>();
       tmp.add(argDestination);
       collectors = tmp.iterator();
-      connectorClient.setCollectors(collectors);
       log.info("using collector specified at agent runtime: " + argDestination);
-    } else if (destinations != null && destinations.hasNext()) {
-      collectors = destinations;
-      connectorClient.setCollectors(destinations);
+    } else
       log.info("using collectors from collectors file");
-    } else {
+
+    if (collectors == null || !collectors.hasNext()) {
       log.error("No collectors specified, exiting (and taking agent with us).");
       agent.shutdown(true);// error is unrecoverable, so stop hard.
       return;
     }
 
+    connectorClient.setCollectors(collectors);
+
+
     try {
       long lastPost = System.currentTimeMillis();
       while (!stopMe) {
@@ -148,8 +160,7 @@
         try {
           // get all ready chunks from the chunkQueue to be sent
           chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); // FIXME: should
-                                                           // really do this by
-                                                           // size
+                                                           // really do this by size
 
         } catch (InterruptedException e) {
           System.out.println("thread interrupted during addChunks(ChunkQueue)");
@@ -159,8 +170,7 @@
         int toSend = newQueue.size();
         List<ChukwaHttpSender.CommitListEntry> results = connectorClient
             .send(newQueue);
-        log.info("sent " + toSend + " chunks, got back " + results.size()
-            + " acks");
+        log.info("sent " + toSend + " chunks, got back " + results.size() + " acks");
         // checkpoint the chunks which were committed
         for (ChukwaHttpSender.CommitListEntry cle : results) {
           agent.reportCommit(cle.adaptor, cle.uuid);
@@ -201,12 +211,18 @@
     try {
       destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
     } catch (IOException e) {
-      log.error(
-          "Failed to retreive list of collectors from conf/collectors file", e);
+      log.error("Failed to retreive list of collectors from conf/collectors file", e);
     }
     if (destinations != null && destinations.hasNext()) {
       collectors = destinations;
     }
-
+  }
+  
+  public ChukwaSender getSender() {
+    return connectorClient;
+  }
+  
+  public void setCollectors(Iterator<String> list) {
+    collectors = list;
   }
 }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java?rev=811958&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java Mon Sep  7 00:55:53 2009
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.sender;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.*;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.*;
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+//import com.google.common.collect.SortedSetMultimap;
+//import com.google.common.collect.TreeMultimap;
+
+import org.apache.log4j.Logger;
+
+/**
+ * An enhancement to ChukwaHttpSender that handles asynchronous acknowledgment.
+ * 
+ * This class will periodically poll the collectors to find out how much data
+ * has been committed to HDFS, and will then pass those acks on to the Agent.
+ */
+public class AsyncAckSender extends ChukwaHttpSender{
+  
+  protected static Logger log = Logger.getLogger(AsyncAckSender.class);
+  /*
+   * Represents the state required for an asynchronous ack.
+   * 
+   * Supplements CommitListEntry with a filename and offset;
+   * the data commits when that file reaches that length.
+   */
+  public static class DelayedCommit extends CommitListEntry implements Comparable<DelayedCommit> {
+    final String fname;
+    final long offset;
+    public DelayedCommit(Adaptor a, long uuid, String fname, long offset) {
+      super(a, uuid);
+      this.fname = fname;
+      this.offset = offset;
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode() ^ fname.hashCode() ^ (int)(offset) ^ (int) (offset >> 32);
+    }
+    
+    public int compareTo(DelayedCommit o) {
+      if(o.uuid < uuid)
+        return 1;
+      else if(o.uuid > uuid)
+        return -1;
+      else return 0;
+    }
+    
+    public String toString() {
+      return adaptor +" commits up to " + uuid + " when " + fname + " hits " + offset;
+    }
+  }
+  
+  public static final String POLLPERIOD_OPT = "connector.commitpoll.period";
+  public static final String POLLHOSTS_OPT = "connector.commitpoll.hostfile";
+  final ChukwaAgent agent;
+  
+  /*
+   * The merge table stores commits that we're expecting, before they're handed
+   * to the CommitPollThread.  There will be only one entry for each adaptor.
+   * 
+   * values are a collection of delayed commits, one per adaptor.
+   * keys are unspecified
+   */
+  final Map<String, DelayedCommit> mergeTable;
+  
+  /**
+   * Periodically scans a subset of the collectors, looking for committed files.
+   * This way, not every collector is pestering the namenode with periodic lses.
+   */
+  static final class CommitPollThread extends Thread {
+    private ChukwaHttpSender scanPath;
+    private int pollPeriod = 1000 * 30;
+
+
+    private final Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
+    private final Map<String, DelayedCommit> mergeTable;
+    private final ChukwaAgent agent;
+
+    CommitPollThread(Configuration conf, ChukwaAgent agent, 
+        Map<String, DelayedCommit> mergeTable, Iterator<String> tryList) {
+      pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
+      scanPath = new ChukwaHttpSender(conf);
+      scanPath.setCollectors(tryList);
+      pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
+      this.mergeTable = mergeTable;
+      this.agent = agent;
+    }
+
+    private volatile boolean running = true;
+    public void shutdown() {
+      running = false;
+      this.interrupt();
+    }
+    
+    public void run() {
+      try {
+        while(running) {
+          Thread.sleep(pollPeriod);
+          //update table using list of pending delayed commits, in this thread
+          checkForCommits();
+          mergePendingTable();
+        }
+      } catch(InterruptedException e) {}
+      catch(IOException e) {
+        log.error(e);
+      }
+    } 
+    
+    /*
+     * Note that this method is NOT threadsafe, and should only be called
+     * from the same thread that will later check for commits
+     */
+    private void mergePendingTable() {
+      synchronized(mergeTable) {
+        for(DelayedCommit dc: mergeTable.values()) {
+          
+          PriorityQueue<DelayedCommit> map = pendingCommits.get(dc.fname);
+          if(map == null) {
+            map = new PriorityQueue<DelayedCommit>();
+            pendingCommits.put(dc.fname, map);
+          }
+          map.add(dc);
+        }
+        mergeTable.clear();
+      }
+    }
+    
+    Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
+    private void checkForCommits() throws IOException, InterruptedException {
+      
+      log.info("checking for commited chunks");
+      GetMethod method = new GetMethod();
+      List<String> parsedFStatuses = scanPath.reliablySend(method, CommitCheckServlet.DEFAULT_PATH); 
+
+      //do an http get
+      for(String stat: parsedFStatuses) {
+        Matcher m = respLine.matcher(stat);
+        if(!m.matches())
+          continue;
+        String path = m.group(1);
+        Long committedOffset = Long.parseLong(m.group(2));
+
+        PriorityQueue<DelayedCommit> delayedOnFile = pendingCommits.get(path);
+        if(delayedOnFile == null)
+          continue;
+       
+        while(!delayedOnFile.isEmpty()) {
+          DelayedCommit fired = delayedOnFile.element();
+          if(fired.offset > committedOffset)
+            break;
+          else
+            delayedOnFile.remove();
+          String s = agent.reportCommit(fired.adaptor, fired.uuid);
+          //TODO: if s == null, then the adaptor has been stopped.
+          //should we stop sending acks?
+          log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
+        }
+      }
+    }
+
+    void setScannableCollectors(Iterator<String> collectorURLs) {
+      // TODO Auto-generated method stub
+      
+    }
+  } 
+  
+  CommitPollThread pollThread;
+  
+  //note that at present we don't actually run this thread; we just use its methods.
+  public AdaptorResetThread adaptorReset;
+  Configuration conf;
+  
+  public AsyncAckSender(Configuration conf, ChukwaAgent a) throws IOException {
+    super(conf);
+    log.info("delayed-commit processing enabled");
+    agent = a;
+    
+    mergeTable = new LinkedHashMap<String, DelayedCommit>();
+    this.conf = conf;
+    adaptorReset = new AdaptorResetThread(conf, a);
+    //initialize the commitpoll later, once we have the list of collectors
+  }
+  
+  
+  @Override
+  public void setCollectors(Iterator<String> collectors) {
+   Iterator<String> tryList = null;
+   String scanHostsFilename = conf.get(POLLHOSTS_OPT, "collectors");
+   try {
+     tryList = DataFactory.getInstance().getCollectorURLs(conf, scanHostsFilename);
+   } catch(IOException e) {
+     log.warn("couldn't read " + scanHostsFilename+ " falling back on collectors list");
+   }
+
+   if(collectors instanceof RetryListOfCollectors) {
+     super.setCollectors(collectors);
+     if(tryList == null)
+       tryList = ((RetryListOfCollectors) collectors).clone();
+   } 
+   else {
+     ArrayList<String> l = new ArrayList<String>();
+     while(collectors.hasNext())
+       l.add(collectors.next());
+     super.setCollectors(l.iterator());
+     if(tryList == null)
+       tryList = l.iterator();
+   }
+
+   pollThread = new CommitPollThread(conf, agent, mergeTable, tryList);
+   pollThread.setDaemon(true);
+   pollThread.start();
+  }
+  
+  /*
+   * This method is the interface from AsyncAckSender to the CommitPollThread --
+   * it gets a lock on the merge table, and then updates it with a batch of pending acks
+   *
+   *  This method is called from the thread doing a post; the merge table is
+   *  read by the CommitPollThread when it figures out what commits are expected.
+   */
+  private void delayCommits(List<DelayedCommit> delayed) {
+    String[] keys = new String[delayed.size()];
+    int i = 0;
+    for(DelayedCommit c: delayed) {
+      String adaptorKey = c.adaptor.hashCode() + "_" + c.adaptor.getCurrentStatus().hashCode();
+      keys[i++] = c.fname +"::" + adaptorKey;
+    }
+    synchronized(mergeTable) {
+      for(i = 0; i < keys.length; ++i) {
+        DelayedCommit cand = delayed.get(i);
+        DelayedCommit cur = mergeTable.get(keys[i]);
+        if(cur == null || cand.offset > cur.offset) 
+          mergeTable.put(keys[i], cand);
+      }
+    }
+  }
+  
+  
+  Pattern partialCommitPat = Pattern.compile("(.*) ([0-9]+)");
+  @Override
+  public List<CommitListEntry> postAndParseResponse(PostMethod method, 
+      List<CommitListEntry> expectedCommitResults)
+  throws IOException, InterruptedException {
+    adaptorReset.reportPending(expectedCommitResults);
+    List<String> resp = reliablySend(method, ServletCollector.PATH);
+    List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>();
+    ArrayList<CommitListEntry> result =  new ArrayList<CommitListEntry>();
+    for(int i = 0; i < resp.size(); ++i)  {
+      if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
+        result.add(expectedCommitResults.get(i));
+      else {
+        CommitListEntry cle = expectedCommitResults.get(i);
+        Matcher m = partialCommitPat.matcher(resp.get(i));
+        if(!m.matches())
+          log.warn("unexpected response: "+ resp.get(i));
+        else
+          log.info("waiting for " + m.group(1) + " to hit " + m.group(2) + " before committing "+ cle.adaptor);
+        toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, m.group(1), 
+            Long.parseLong(m.group(2))));
+      }
+    }
+    delayCommits(toDelay);
+    return result;
+  }
+  
+  @Override
+  protected boolean failedCollector(String downed) {
+    log.info("collector "+ downed + " down; resetting adaptors");
+    adaptorReset.resetTimedOutAdaptors(0); //reset all adaptors with outstanding data.
+    return false;
+  }
+  
+  @Override
+  public void stop() {
+    pollThread.shutdown();
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Mon Sep  7 00:55:53 2009
@@ -197,10 +197,19 @@
     return postAndParseResponse(method, commitResults);
   }
   
-  public List<CommitListEntry> postAndParseResponse(PostMethod method, List<CommitListEntry> commitResults)
+  /**
+   * 
+   * @param method the data to push
+   * @param expectedCommitResults the list
+   * @return the list of committed chunks
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public List<CommitListEntry> postAndParseResponse(PostMethod method, 
+        List<CommitListEntry> expectedCommitResults)
   throws IOException, InterruptedException{
     reliablySend(method, "chukwa"); //FIXME: shouldn't need to hardcode this here
-    return commitResults;
+    return expectedCommitResults;
   }
 
   /**
@@ -223,8 +232,8 @@
 
         return responses;
       } catch (Throwable e) {
-        log.error("Http post exception");
-        log.debug("Http post exception", e);
+        log.error("Http post exception on "+ currCollector +": "+ e.toString());
+        log.debug("Http post exception on "+ currCollector, e);
         ChukwaHttpSender.metrics.httpThrowable.inc();
         if (collectors.hasNext()) {
           ChukwaHttpSender.metrics.collectorRollover.inc();
@@ -292,7 +301,6 @@
     // Send POST request
     ChukwaHttpSender.metrics.httpPost.inc();
     
-    // client.setTimeout(15*1000);
     int statusCode = client.executeMethod(method);
 
     if (statusCode != HttpStatus.SC_OK) {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java Mon Sep  7 00:55:53 2009
@@ -20,14 +20,64 @@
 
 
 import java.util.List;
+import java.util.ArrayList;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.conf.Configuration;
 
 public interface ChukwaWriter {
+  
+  public static abstract class CommitStatus {}
+  
+  public static final CommitStatus COMMIT_OK = new CommitStatus() {};
+  public static final CommitStatus COMMIT_FAIL = new CommitStatus() {};
+  
+  /**
+   * COMMIT_PENDING should be returned if a writer has written data, but
+   * this data may ultimately disappear. Contains a list of strings, format
+   * unspecified, that agents can use to find out, eventually, if their data 
+   * has committed.  String <n> corresponds to the nth chunk passed to add().
+   * 
+   *  At present, the format is <sinkfilename> <offset>, 
+   *  where sinkfilename is the name of a sinkfile, without directory but with
+   *  .done suffix, and offset is the last byte of the associated chunk.
+   */
+  public static class COMMIT_PENDING extends CommitStatus {
+    public List<String> pendingEntries;
+  
+    public COMMIT_PENDING(int entries) {
+      pendingEntries = new ArrayList<String>(entries);
+    }
+    
+    public void addPend(String currentFileName, long dataSize) {
+      pendingEntries.add(currentFileName+ " " + dataSize+"\n");
+    }
+  }
+  
+  /**
+   * Called once to initialize this writer.
+   * 
+   * @param c
+   * @throws WriterException
+   */
   public void init(Configuration c) throws WriterException;
 
-  public void add(List<Chunk> chunks) throws WriterException;
-
-  public void close() throws WriterException;;
+  /**
+   * Called repeatedly with data that should be serialized.
+   * 
+   * Subclasses may assume that init() will be called before any calls to
+   * add(), and that add() won't be called after close().
+   * 
+   * @param chunks
+   * @return
+   * @throws WriterException
+   */
+  public CommitStatus add(List<Chunk> chunks) throws WriterException;
+
+  /**
+   * Called once, indicating that the writer should close files and prepare
+   * to exit.
+   * @throws WriterException
+   */
+  public void close() throws WriterException;
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java Mon Sep  7 00:55:53 2009
@@ -91,11 +91,11 @@
   }
 
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
     for (Chunk chunk : chunks) {
       add(chunk);
     }
-
+    return COMMIT_OK;
   }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java Mon Sep  7 00:55:53 2009
@@ -100,14 +100,15 @@
   }
 
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
     ArrayList<Chunk> passedThrough = new ArrayList<Chunk>();
     for (Chunk c : chunks)
       if (!cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
         passedThrough.add(c);
 
     if (!passedThrough.isEmpty())
-      next.add(passedThrough);
+      return next.add(passedThrough);
+    else return null;
   }
 
   @Override

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java Mon Sep  7 00:55:53 2009
@@ -50,11 +50,11 @@
   }
 
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
     for (Chunk chunk : chunks) {
       add(chunk);
     }
-
+    return COMMIT_OK;
   }
 
   DataInputStream dis = null;

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java Mon Sep  7 00:55:53 2009
@@ -17,7 +17,7 @@
   int maxDataRate = Integer.MAX_VALUE;
   public static final String RATE_OPT_NAME = "nullWriter.dataRate";
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
     try {
       int dataBytes =0;
       for(Chunk c: chunks)
@@ -25,7 +25,7 @@
       if(maxDataRate > 0)
         Thread.sleep(dataBytes / maxDataRate);
     } catch(Exception e) {}
-    return;
+    return COMMIT_OK;
   }
 
   @Override

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java Mon Sep  7 00:55:53 2009
@@ -37,8 +37,8 @@
   ChukwaWriter writer; // head of pipeline
 
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
-    writer.add(chunks);
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    return writer.add(chunks);
   }
 
   @Override

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Mon Sep  7 00:55:53 2009
@@ -45,9 +45,13 @@
  */
 public class SeqFileWriter implements ChukwaWriter {
   static Logger log = Logger.getLogger(SeqFileWriter.class);
-  public static final boolean ENABLE_ROTATION = true;
+  public static boolean ENABLE_ROTATION_ON_CLOSE = true;
 
-  static final int STAT_INTERVAL_SECONDS = 30;
+  int STAT_INTERVAL_SECONDS = 30;
+  private int rotateInterval = 1000 * 60 * 5;
+  
+  public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
+  public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
   static String localHostAddr = null;
   
   final Object lock = new Object();
@@ -61,19 +65,18 @@
 
   private Path currentPath = null;
   private String currentFileName = null;
-  private FSDataOutputStream currentOutputStr = null;
+  protected FSDataOutputStream currentOutputStr = null;
   private SequenceFile.Writer seqFileWriter = null;
 
-  private int rotateInterval = 1000 * 60;
   private long timePeriod = -1;
   private long nextTimePeriodComputation = -1;
   
-  private Timer rotateTimer = null;  
-  private Timer statTimer = null;
+  protected Timer rotateTimer = null;  
+  protected Timer statTimer = null;
   
   private volatile long dataSize = 0;
-  private volatile boolean chunksWrittenThisRotate = false;
-  private volatile boolean isRunning = false;
+  private volatile long bytesThisRotate = 0;
+  protected volatile boolean isRunning = false;
   
   static {
     try {
@@ -83,7 +86,10 @@
     }
   }
   
-  public SeqFileWriter() throws WriterException {
+  public SeqFileWriter() {}
+  
+  public long getBytesWritten() {
+    return dataSize;
   }
   
   public void init(Configuration conf) throws WriterException {
@@ -91,9 +97,9 @@
 
     this.conf = conf;
 
-    rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
-        1000 * 60 * 5);// defaults to 5 minutes
+    rotateInterval = conf.getInt(ROTATE_INTERVAL_OPT,rotateInterval);
     
+    STAT_INTERVAL_SECONDS = conf.getInt(STAT_PERIOD_OPT, STAT_INTERVAL_SECONDS);
 
     // check if they've told us the file system to use
     String fsname = conf.get("writer.hdfs.filesystem");
@@ -126,13 +132,14 @@
     }
 
     // Setup everything by rotating
+
+    isRunning = true;
     rotate();
 
     statTimer = new Timer();
     statTimer.schedule(new StatReportingTask(), 1000,
         STAT_INTERVAL_SECONDS * 1000);
 
-    isRunning = true;
   }
 
   private class StatReportingTask extends TimerTask {
@@ -156,7 +163,10 @@
   void rotate() {
      if (rotateTimer != null) {
       rotateTimer.cancel();
-    }
+    } 
+     
+    if(!isRunning)
+      return;
     
     calendar.setTimeInMillis(System.currentTimeMillis());
 
@@ -182,8 +192,8 @@
 
         if (previousOutputStr != null) {
           previousOutputStr.close();
-          if (chunksWrittenThisRotate) {
-            log.info("rotate file on HDFS");
+          if (bytesThisRotate > 0) {
+            log.info("rotating sink file " + previousPath);
             fs.rename(previousPath, new Path(previousFileName + ".done"));
           } else {
             log.info("no chunks written to " + previousPath + ", deleting");
@@ -195,7 +205,7 @@
         currentOutputStr = newOutputStr;
         currentPath = newOutputPath;
         currentFileName = newName;
-        chunksWrittenThisRotate = false;
+        bytesThisRotate = 0;
         // Uncompressed for now
         seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
             ChukwaArchiveKey.class, ChunkImpl.class,
@@ -241,7 +251,8 @@
   }
   
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    COMMIT_PENDING result = new COMMIT_PENDING(chunks.size());
     if (!isRunning) {
       log.info("Collector not ready");
       throw new WriterException("Collector not ready");
@@ -249,7 +260,6 @@
 
     if (chunks != null) {
       try {
-        chunksWrittenThisRotate = true;
         ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
         
         if (System.currentTimeMillis() >= nextTimePeriodComputation) {
@@ -264,20 +274,25 @@
             archiveKey.setSeqId(chunk.getSeqID());
 
             if (chunk != null) {
-              seqFileWriter.append(archiveKey, chunk);
               // compute size for stats
               dataSize += chunk.getData().length;
+              bytesThisRotate += chunk.getData().length;
+              seqFileWriter.append(archiveKey, chunk);
+
+              String futureName = currentPath.getName().replace(".chukwa", ".done");
+              result.addPend(futureName, currentOutputStr.getPos());
             }
 
           }
         }// End synchro
       } catch (Throwable e) {
         // We don't want to loose anything
-        log.fatal("IOException when trying to write a chunk, Collector is going to exit!");
+        log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
         DaemonWatcher.bailout(-1);
         isRunning = false;
       }
     }
+    return result;
   }
 
   public void close() {
@@ -293,18 +308,19 @@
     }
 
     // If we are here it's either because of an HDFS exception
-    // or Agent has received a kill -TERM
-    // In both cases, we will not be able to execute any
-    // HDFS command so There's no point in trying to 
-    // close or rename the file
-    // see HDFS shutdownHook Jira
-    // but just in case someone fixes this issue
+    // or Collector has received a kill -TERM
+  
     try {
       synchronized(lock) {
         if (this.currentOutputStr != null) {
           this.currentOutputStr.close();
         }
-        fs.rename(currentPath, new Path(currentFileName + ".done"));
+        if(ENABLE_ROTATION_ON_CLOSE)
+          if(bytesThisRotate > 0)
+            fs.rename(currentPath, new Path(currentFileName + ".done"));
+          else
+            fs.delete(currentPath, false);
+
       }
     } catch (Throwable e) {
      log.warn("cannot rename dataSink file:" + currentPath,e);

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java Mon Sep  7 00:55:53 2009
@@ -224,8 +224,8 @@
   }
 
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
-    next.add(chunks); //pass data through
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    CommitStatus rv = next.add(chunks); //pass data through
     synchronized(tees) {
       Iterator<Tee> loop = tees.iterator();
       while(loop.hasNext()) {
@@ -235,6 +235,7 @@
         }
       }
     }
+    return rv;
   }
 
   @Override

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java Mon Sep  7 00:55:53 2009
@@ -224,7 +224,7 @@
    *  Best effort, there's no guarantee that chunks 
    *  have really been written to disk
    */
-  public void add(List<Chunk> chunks) throws WriterException {
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
     if (!isRunning) {
       throw new WriterException("Writer not yet ready");
     }
@@ -270,6 +270,7 @@
         throw new WriterException(e);
       }
     }
+    return COMMIT_OK;
   }
 
   protected void rotate() {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java Mon Sep  7 00:55:53 2009
@@ -91,7 +91,7 @@
   }
 
   //returns true if dir exists
-  private boolean nukeDirContents(File dir) {
+  public static boolean nukeDirContents(File dir) {
     if(dir.exists()) {
       if(dir.isDirectory()) {
         for(File f: dir.listFiles()) {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java Mon Sep  7 00:55:53 2009
@@ -32,12 +32,13 @@
   public static ArrayList<Chunk> outputs = new ArrayList<Chunk>();
 
   @Override
-  public void add(List<Chunk> chunks) throws WriterException {
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
 
     synchronized (outputs) {
       for (Chunk c : chunks)
         outputs.add(c);
     }
+    return COMMIT_OK;
   }
 
   @Override

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java Mon Sep  7 00:55:53 2009
@@ -4,6 +4,7 @@
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
 import org.apache.hadoop.chukwa.datacollection.writer.NullWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
 import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
@@ -39,7 +40,10 @@
     
     conf.setInt("chukwaAgent.control.port", 0);
     ChukwaAgent agent = new ChukwaAgent(conf);
-    HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/chukwa");
+    RetryListOfCollectors clist = new RetryListOfCollectors(conf);
+    clist.add("http://localhost:"+PORTNO+"/chukwa");
+    HttpConnector conn = new HttpConnector(agent);
+    conn.setCollectors(clist);
     conn.start();
     Server server = new Server(PORTNO);
     Context root = new Context(server, "/", Context.SESSIONS);
@@ -50,11 +54,11 @@
     Thread.sleep(1000);
     agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
         " testData "+ SEND_RATE + " 0");
+    assertNotNull(agent.getAdaptor("constSend"));
     Thread.sleep(TEST_DURATION_SECS * 1000);
 
     String[] stat = agent.getAdaptorList().get("constSend").split(" ");
     long kbytesPerSec = Long.valueOf(stat[stat.length -1]) / TEST_DURATION_SECS / 1000;
-
     System.out.println("data rate was " + kbytesPerSec + " kb /second");
     assertTrue(kbytesPerSec < WRITE_RATE_KB); //write rate should throttle sends
     assertTrue(kbytesPerSec > MIN_ACCEPTABLE_PERCENT* WRITE_RATE_KB / 100);//an assumption, but should hold true

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java?rev=811958&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java Mon Sep  7 00:55:53 2009
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.*;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.TestRawAdaptor;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorResetThread;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.sender.*;
+import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
+import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
+import org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
+import org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import junit.framework.TestCase;
+import static org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender.DelayedCommit;
+
+public class TestDelayedAcks extends TestCase {
+  
+  static final int PORTNO = 9993;
+  static int END2END_TEST_SECS = 30;
+  static int SEND_RATE = 180* 1000; //bytes/sec
+  static int CLIENT_SCANPERIOD = 1000;
+  static int SERVER_SCANPERIOD = 1000;
+  static int ROTATEPERIOD = 2000;
+  
+  int ACK_TIMEOUT = 200;
+  public void testAdaptorTimeout() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("chukwaAgent.control.port", "0");
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+    conf.setInt("chukwaAgent.adaptor.context.switch.time", 500);
+    conf.setInt(AdaptorResetThread.TIMEOUT_OPT, ACK_TIMEOUT);
+
+    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChunkCatcherConnector chunks = new ChunkCatcherConnector();
+    chunks.start();
+    assertEquals(0, agent.adaptorCount());
+    File testFile = TestRawAdaptor.makeTestFile("testDA", 50);
+    long len = testFile.length();
+    System.out.println("wrote data to " + testFile);
+    AdaptorResetThread restart = new AdaptorResetThread(conf, agent);
+    //start timeout thread
+    agent.processAddCommand("add fta = "+ FileTailingAdaptor.class.getCanonicalName()
+        + " testdata " + testFile.getCanonicalPath() + " 0" );
+    
+    
+    assertEquals(1, agent.adaptorCount());
+    Chunk c1 = chunks.waitForAChunk();
+    assertNotNull(c1);
+    List<CommitListEntry> pendingAcks = new ArrayList<CommitListEntry>();
+    pendingAcks.add(new DelayedCommit(c1.getInitiator(), c1.getSeqID(), "foo", c1.getSeqID()));
+    restart.reportPending(pendingAcks);
+
+    assertEquals(len, c1.getData().length);
+    Thread.sleep(ACK_TIMEOUT*2);
+    restart.resetTimedOutAdaptors(ACK_TIMEOUT);
+    Chunk c2 = chunks.waitForAChunk(1000);
+    assertNotNull(c2);
+    assertEquals(len, c2.getData().length);
+    assertTrue(restart.getResetCount() > 0);
+    agent.shutdown();
+//start an adaptor -- chunks should appear in the connector
+    //wait for timeout.  More chunks should appear.
+    
+    testFile.delete();
+  }
+  
+  /*
+   * Checks the CommitCheckServlet works correctly with a one-chunk file.
+   */
+  public void testDelayedAck() throws Exception {
+    Configuration conf = new Configuration();
+
+    SeqFileWriter writer = new SeqFileWriter();
+
+    conf.set("writer.hdfs.filesystem", "file:///");
+    
+    File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+    if (!tempDir.exists()) {
+      tempDir.mkdirs();
+    }
+    
+    String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis();
+
+    String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
+    conf.set("chukwaCollector.outputDir", seqWriterOutputDir );
+
+    writer.init(conf);
+    ArrayList<Chunk> oneChunk = new ArrayList<Chunk>();
+    oneChunk.add(new ChunkImpl("dt", "name", 1, new byte[] {'b'}, null));
+
+    ChukwaWriter.CommitStatus cs = writer.add(oneChunk);
+    writer.close();
+    
+    File seqWriterFile = null;
+    File directory = new File(seqWriterOutputDir);
+    String[] files = directory.list();
+    for(String file: files) {
+      if ( file.endsWith(".done") ){
+        seqWriterFile = new File(directory, file);
+        break;
+      }
+    }
+    long lenWritten = seqWriterFile.length();
+    System.out.println("wrote " + lenWritten+ " bytes");
+    assertTrue(cs instanceof ChukwaWriter.COMMIT_PENDING);
+    ChukwaWriter.COMMIT_PENDING pending = (ChukwaWriter.COMMIT_PENDING) cs;
+    assertTrue(pending.pendingEntries.size() == 1);
+    String res = pending.pendingEntries.get(0);
+    System.out.println("result was " + res);
+    
+    Pattern expectedPat= Pattern.compile(".* ([0-9]+)\n");
+    Matcher match = expectedPat.matcher(res);
+    assertTrue(match.matches());
+    long bytesPart = Long.parseLong(match.group(1));
+    assertEquals(bytesPart, lenWritten);
+  }
+  
+
+  public static Server startCollectorOnPort(Configuration conf, int port, 
+      ServletCollector collector) throws Exception {
+    Server server = new Server(port);
+    
+    Context root = new Context(server, "/", Context.SESSIONS);
+    root.addServlet(new ServletHolder(collector), "/*");
+    root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
+
+    server.start();
+    server.setStopAtShutdown(false);
+    return server;
+  }
+  
+
+  public static String buildConf(Configuration conf) {
+    File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+    if (!tempDir.exists()) {
+      tempDir.mkdirs();
+    }
+    
+    String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis() ;
+
+    conf.setInt("chukwaCollector.rotateInterval", ROTATEPERIOD);
+    conf.set("writer.hdfs.filesystem", "file:///");
+    String seqWriterOutputDir = outputDirectory +"/chukwa_sink";
+    conf.set("chukwaCollector.outputDir", seqWriterOutputDir );
+    conf.setInt(AsyncAckSender.POLLPERIOD_OPT, CLIENT_SCANPERIOD);
+    conf.setInt(CommitCheckServlet.SCANPERIOD_OPT, SERVER_SCANPERIOD);
+    conf.setBoolean(HttpConnector.ASYNC_ACKS_OPT, true);
+    conf.setInt(HttpConnector.MIN_POST_INTERVAL_OPT, 100);
+    conf.setInt(HttpConnector.MAX_SIZE_PER_POST_OPT, 10 * 1000*1000);
+    conf.setInt(SeqFileWriter.STAT_PERIOD_OPT, 60*60*24); 
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+    //turn off stats reporting thread, so we can use Writer.dataSize
+    conf.set(AsyncAckSender.POLLHOSTS_OPT, "afilethatdoesntexist");
+      //so that it won't try to read conf/collectors
+    conf.setInt("chukwaAgent.control.port", 0);
+    return outputDirectory;
+  }
+  
+  public void testEndToEnd() {
+    try {
+      Configuration conf = new Configuration();
+
+      String outputDirectory = buildConf(conf);
+      ServletCollector collector = new ServletCollector(conf);
+      Server collectorServ = startCollectorOnPort(conf, PORTNO, collector);
+      Thread.sleep(1000);
+      
+      ChukwaAgent agent = new ChukwaAgent(conf);
+      HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
+      conn.start();
+      String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
+          " testData "+ SEND_RATE + " 0");
+      assertTrue("constSend".equals(resp));
+      Thread.sleep(END2END_TEST_SECS * 1000);
+
+      //do the shutdown directly, here, so that acks are still processed.
+      assertNotNull(agent.getAdaptor("constSend"));
+      long bytesOutput = agent.getAdaptor("constSend").shutdown();
+      Thread.sleep(CLIENT_SCANPERIOD + SERVER_SCANPERIOD + ROTATEPERIOD + 3000);
+      
+      String[] stat = agent.getAdaptorList().get("constSend").split(" ");
+      long bytesCommitted = Long.valueOf(stat[stat.length -1]);
+      
+      long bytesPerSec = bytesOutput / (1000 * END2END_TEST_SECS);
+      System.out.println("data rate was " + bytesPerSec + " kb /second");
+   
+      //all data should be committed
+      System.out.println(bytesCommitted + " bytes committed");
+      System.out.println(bytesOutput + " bytes output");
+      System.out.println("difference is " + (bytesOutput - bytesCommitted));
+      ChukwaWriter w = collector.getWriter();
+      long bytesWritten = ((SeqFileWriter)w).getBytesWritten();
+      System.out.println("collector wrote " + bytesWritten);
+
+      assertEquals(bytesCommitted, bytesOutput);
+      assertEquals(bytesWritten, bytesCommitted);
+      //We need a little imprecision here, since the send rate is a bit bursty,
+      //and since some acks got lost after the adaptor was stopped.
+      assertTrue(bytesPerSec > 9 * SEND_RATE/ 1000 / 10);
+      AsyncAckSender sender = (AsyncAckSender)conn.getSender();
+      assertEquals(0, sender.adaptorReset.getResetCount());
+      
+      agent.shutdown();
+      collectorServ.stop();
+      conn.shutdown();
+      Thread.sleep(5000); //for collector to shut down
+      TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+
+  }
+
+
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java?rev=811958&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java Mon Sep  7 00:55:53 2009
@@ -0,0 +1,134 @@
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
+import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
+import org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
+import org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.mortbay.jetty.Server;
+import junit.framework.TestCase;
+
+public class TestFailedCollectorAck extends TestCase {
+  
+  static final int PORTNO = 9993;
+
+  public void testFailureRecovery() {
+    try {
+    Configuration conf = new Configuration();
+
+    String outputDirectory = TestDelayedAcks.buildConf(conf);
+    SeqFileWriter.ENABLE_ROTATION_ON_CLOSE = false;
+    File sinkA = new File(outputDirectory, "chukwa_sink_A");
+    sinkA.mkdir();
+    File sinkB = new File(outputDirectory, "chukwa_sink_B");
+    sinkB.mkdir();
+    conf.set(CommitCheckServlet.SCANPATHS_OPT, sinkA.getCanonicalPath()
+        + "," + sinkB.getCanonicalPath());
+    conf.set("chukwaCollector.outputDir", sinkA.getCanonicalPath() );
+    ServletCollector collector1 = new ServletCollector(new Configuration(conf));
+    conf.set("chukwaCollector.outputDir",sinkB.getCanonicalPath() );
+    ServletCollector collector2 = new ServletCollector(conf);
+    Server collector1_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+1, collector1);
+    Server collector2_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+2, collector2);
+    Thread.sleep(2000); //for collectors to start
+    
+    ChukwaAgent agent = new ChukwaAgent(conf);
+    HttpConnector conn = new HttpConnector(agent);
+    RetryListOfCollectors clist = new RetryListOfCollectors(conf);
+    clist.add("http://localhost:"+(PORTNO+1)+"/");
+    clist.add("http://localhost:"+(PORTNO+2)+"/");
+    conn.setCollectors(clist);
+    conn.start();
+    //FIXME: somehow need to clue in commit checker which paths to check.
+    //       Somehow need 
+
+    String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
+        " testData "+ TestDelayedAcks.SEND_RATE + " 12345 0");
+    assertTrue("constSend".equals(resp));
+    Thread.sleep(10 * 1000);
+    collector1_s.stop();
+    Thread.sleep(10 * 1000);
+    SeqFileWriter.ENABLE_ROTATION_ON_CLOSE = true;
+
+    String[] stat = agent.getAdaptorList().get("constSend").split(" ");
+    long bytesCommitted = Long.valueOf(stat[stat.length -1]);
+    assertTrue(bytesCommitted > 0);
+    agent.shutdown();
+    conn.shutdown();
+    Thread.sleep(2000); //for collectors to shut down
+    collector2_s.stop();
+    Thread.sleep(2000); //for collectors to shut down
+    
+    checkDirs(conf, conf.get(CommitCheckServlet.SCANPATHS_OPT));
+    
+    TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
+    (new File(outputDirectory)).delete();
+    } catch(Exception e) {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+  }
+  
+  public void checkDirs(Configuration conf, String paths) throws IOException {
+    
+    ArrayList<Path> toScan = new ArrayList<Path>();
+    ArrayList<ByteRange> bytes = new ArrayList<ByteRange>();
+    FileSystem localfs = FileSystem.getLocal(conf);
+
+    String[] paths_s = paths.split(",");
+    for(String s: paths_s)
+      if(s.length() > 1)
+        toScan.add(new Path(s));
+    
+    for(Path p: toScan) {
+      
+      FileStatus[] dataSinkFiles = localfs.listStatus(p, SinkArchiver.DATA_SINK_FILTER);
+      for(FileStatus fstatus: dataSinkFiles) {
+        System.out.println(fstatus.getPath().getName());
+      }
+      for(FileStatus fstatus: dataSinkFiles) {
+        if(!fstatus.getPath().getName().endsWith(".done"))
+          continue;
+        
+        SequenceFile.Reader reader = new SequenceFile.Reader(localfs, fstatus.getPath(), conf);
+
+        ChukwaArchiveKey key = new ChukwaArchiveKey();
+        ChunkImpl chunk = ChunkImpl.getBlankChunk();
+
+        while (reader.next(key, chunk)) {
+         bytes.add(new ByteRange(chunk));
+        }
+        reader.close();
+      }
+    }
+
+    assertNotNull(bytes);
+    Collections.sort(bytes);
+    
+    ValidatorSM sm = new ValidatorSM();
+    for(ByteRange b: bytes) {
+      String s = sm.advanceSM(b);
+      if(s != null)
+        System.out.println(s);
+    }
+    assertEquals(0, sm.missingBytes);
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java?rev=811958&r1=811957&r2=811958&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java Mon Sep  7 00:55:53 2009
@@ -32,10 +32,10 @@
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.ToolRunner;
 import junit.framework.TestCase;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.writeASinkFile;
 
 public class TestArchive extends TestCase {
 
-  java.util.Random r = new java.util.Random();
   
    public void browseDir(FileSystem fs, Path p, int d) throws IOException {
      for(int i=0; i< d; ++i) {
@@ -52,49 +52,6 @@
      else
        System.out.println( p.getName() );
    }
-  
-   long lastSeqID = 0;
-  public ChunkImpl getARandomChunk() {
-    int ms = r.nextInt(1000);
-    String line = "2008-05-29 10:42:22," + ms
-        + " INFO org.apache.hadoop.dfs.DataNode: Some text goes here"
-        + r.nextInt() + "\n";
-
-    ChunkImpl c = new ChunkImpl("HadoopLogProcessor", "test",
-        line.length()  + lastSeqID, line.getBytes(), null);
-    lastSeqID += line.length();
-    c.addTag("cluster=\"foocluster\"");
-    return c;
-  }
-  
-  public void writeASinkFile(Configuration conf, FileSystem fileSys, Path dest,
-      int chunks) throws IOException {
-    FSDataOutputStream out = fileSys.create(dest);
-
-    Calendar calendar = Calendar.getInstance();
-    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
-        ChukwaArchiveKey.class, ChunkImpl.class,
-        SequenceFile.CompressionType.NONE, null);
-    for (int i = 0; i < chunks; ++i) {
-      ChunkImpl chunk = getARandomChunk();
-      ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
-      calendar.set(Calendar.YEAR, 2008);
-      calendar.set(Calendar.MONTH, Calendar.MAY);
-      calendar.set(Calendar.DAY_OF_MONTH, 29);
-      calendar.set(Calendar.HOUR, 10);
-      calendar.set(Calendar.MINUTE, 0);
-      calendar.set(Calendar.SECOND, 0);
-      calendar.set(Calendar.MILLISECOND, 0);
-      archiveKey.setTimePartition(calendar.getTimeInMillis());
-      archiveKey.setDataType(chunk.getDataType());
-      archiveKey.setStreamName(chunk.getStreamName());
-      archiveKey.setSeqId(chunk.getSeqID());
-      seqFileWriter.append(archiveKey, chunk);
-    }
-    seqFileWriter.close();
-    out.close();
-  }
 
   static final int NUM_HADOOP_SLAVES = 1;
   static final Path DATASINK = new Path("/chukwa/logs/*");



Mime
View raw message