hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r685353 [8/13] - in /hadoop/core/trunk: ./ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/build/ src/contrib/chukwa/conf/ src/contrib/chukwa/dist/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/ha...
Date Tue, 12 Aug 2008 22:35:23 GMT
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+import org.apache.hadoop.chukwa.datacollection.connector.*;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.*;
+
+/**
+ * The local agent daemon that runs on each machine.
+ * This class is designed to be embeddable, for use in testing.
+ * 
+ */
+public class ChukwaAgent
+{
+  boolean DO_CHECKPOINT_RESTORE = false;
+  boolean WRITE_CHECKPOINTS = false;
+
+  static Logger log = Logger.getLogger(ChukwaAgent.class);
+
+  //doesn't need an equals(), comparator, etc
+  private static class Offset {
+    public Offset(long l, long id)  {
+      offset = l;
+      this.id = id;
+    }
+    private volatile long id;
+    private volatile long offset;
+  }
+
+  public static class AlreadyRunningException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    public AlreadyRunningException() {
+      super("Agent already running; aborting");
+    }
+  }
+  
+  
+  private final Map<Adaptor, Offset> adaptorPositions;
+
+  //basically only used by the control socket thread.
+  private final Map<Long, Adaptor> adaptorsByNumber;
+
+  File checkpointDir;   //lock this object to indicate checkpoint in progress
+  File initialAdaptors;
+  String CHECKPOINT_BASE_NAME;  //base filename for checkpoint files
+  int CHECKPOINT_INTERVAL_MS ;  //min interval at which to write checkpoints
+
+
+  private Timer checkpointer;
+  private volatile boolean needNewCheckpoint = false; //set to true if any event has happened
+  //that should cause a new checkpoint to be written
+
+
+  private long lastAdaptorNumber= 0;   //ID number of the last adaptor to be started
+  private int checkpointNumber; //id number of next checkpoint.
+  //should be protected by grabbing lock on checkpointDir
+
+
+  private final AgentControlSocketListener controlSock;
+
+  /**
+   * @param args
+   * @throws AdaptorException 
+   */
+  public static void main(String[] args) throws AdaptorException {
+
+    try{
+      System.out.println("usage:  LocalAgent [-restore] [default collector URL]");
+      ChukwaAgent agent = new ChukwaAgent();
+      if(agent.anotherAgentIsRunning()) {
+        System.out.println("another agent is running (or port has been usurped).  Bailing out now");
+      }
+        
+      Connector connector;
+
+      int uriArgNumber= 0;
+      if(args.length > 0)  {
+        if(args[0].equals("-restore")) {
+          agent.DO_CHECKPOINT_RESTORE = true;
+          uriArgNumber = 1;
+        }
+        if(args[uriArgNumber].equals("local"))
+          connector = new ConsoleOutConnector(agent);
+        else
+        {
+          if(!args[uriArgNumber].contains("://"))
+            args[uriArgNumber] = "http://" + args[uriArgNumber];
+          connector = new HttpConnector(agent, args[uriArgNumber]);
+        }
+      }
+      else
+        connector = new HttpConnector(agent);
+
+      connector.start();
+
+      log.info("local agent started on port " + agent.getControlSock().portno);
+
+    }	catch(AlreadyRunningException e){
+      log.error("agent started already on this machine with same portno ; bailing out");
+      System.out.println("agent started already on this machine with same portno ; bailing out");
+      System.exit(0); //better safe than sorry
+    } catch(Exception e) 	{
+      e.printStackTrace();
+    }
+  }
+  private boolean anotherAgentIsRunning() {
+    return !controlSock.isBound();
+  }
+  /**
+   * @return the number of running adaptors inside this local agent
+   */
+  public int adaptorCount() {
+    return adaptorPositions.size();
+  }
+
+  public ChukwaAgent() throws AlreadyRunningException
+  {
+    readConfig();
+
+    //almost always just reading this; so use a ConcurrentHM.
+    //since we wrapped the offset, it's not a structural mod.
+    adaptorPositions= new ConcurrentHashMap<Adaptor, Offset>();
+    adaptorsByNumber = new HashMap<Long, Adaptor>();
+    checkpointNumber=0;
+    try{
+      if(DO_CHECKPOINT_RESTORE)
+        restoreFromCheckpoint();
+    } catch(IOException e)  {
+      log.warn("failed to restart from checkpoint: ", e);
+    }
+    
+    try {
+      if(initialAdaptors != null && initialAdaptors.exists())
+        readAdaptorsFile(initialAdaptors);
+    } catch(IOException e) {
+      log.warn("couldn't read user-specified file "+ initialAdaptors.getAbsolutePath());
+    }
+    
+    controlSock = new AgentControlSocketListener(this);
+    try {
+      controlSock.tryToBind(); //do this synchronously; if it fails, we know another agent is running.
+      controlSock.start();  //this sets us up as a daemon
+      log.info("control socket started on port " + controlSock.portno);
+      
+      if(CHECKPOINT_INTERVAL_MS > 0)  {
+        checkpointer = new Timer();
+        checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
+      }
+    } catch(IOException e) {
+      log.info("failed to bind to socket; aborting agent launch", e);
+      throw new AlreadyRunningException();
+    }
+
+  
+  }
+
+  //FIXME: should handle bad lines here
+  public long processCommand(String cmd)
+  {
+    String[] words = cmd.split(" ");
+    if(words[0].equalsIgnoreCase("add"))
+    {
+      //words should contain (space delimited):
+      //  0) command ("add")
+      //  1) AdaptorClassname
+      //  2) dataType (e.g. "hadoop_log")
+      //  3) params <optional> 
+      //           (e.g. for files, this is filename,
+      //            but can be arbitrarily many space
+      //            delimited agent specific params )
+      //  4) offset
+
+      long offset;
+      try  {
+        offset = Long.parseLong(words[words.length-1]);
+      } catch(NumberFormatException e) {
+        log.warn("malformed line " + cmd);
+        return -1L;
+      }
+      String adaptorName = words[1];
+
+      Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
+      if(adaptor == null) {
+        log.warn("don't recognize adaptor name " + adaptorName);
+        return -1L;
+      }
+      
+
+      String dataType = words[2];
+      
+      String params = "";
+      if(words.length > 4){ //no argument
+        int begParams = adaptorName.length()+dataType.length()+6;//length("ADD x type ") = length(x) + 5, i.e. count letters & spaces
+        params = cmd.substring(begParams, cmd.length() - words[words.length-1].length() -1);
+      }
+      long adaptorID;
+      synchronized(adaptorsByNumber) {
+        adaptorID  = ++lastAdaptorNumber;
+        adaptorsByNumber.put(adaptorID, adaptor);
+        adaptorPositions.put(adaptor, new Offset(offset,adaptorID));
+      }
+      
+      try {
+        adaptor.start(dataType, params, offset, DataFactory.getInstance().getEventQueue());
+        log.info("started a new adaptor, id = " +adaptorID);
+        return adaptorID ;
+        
+      } catch(AdaptorException e) {
+        log.warn("failed to start adaptor", e);
+        //FIXME: don't we need to clean up the adaptor maps here?
+      }
+    }
+    else
+      log.warn("only 'add' command supported in config files");
+
+    return -1;
+  }
+
+  /**
+   *  Tries to restore from a checkpoint file in checkpointDir.
+   *  There should usually only be one checkpoint present --
+   *  two checkpoints present implies a crash during
+   *  writing the higher-numbered one.
+   *  As a result, this method chooses the lowest-numbered file present.
+   *  
+   *  Lines in the checkpoint file are processed one at a time with processCommand();
+   *   
+   * @return true if the restore succeeded
+   * @throws IOException
+   */ 
+  public boolean restoreFromCheckpoint() throws IOException
+  {
+    synchronized(checkpointDir)
+    {
+      String[] checkpointNames =  checkpointDir.list(new FilenameFilter()
+      {
+        public boolean accept(File dir, String name)  {
+          return name.startsWith(CHECKPOINT_BASE_NAME);
+        } 
+      });
+      if(checkpointNames.length == 0)
+      {
+        log.info("No checkpoints found in "+ checkpointDir);
+        return false;
+      }
+
+      if(checkpointNames.length > 2)
+        log.warn("expected at most two checkpoint files in " + checkpointDir +  "; saw " + checkpointNames.length);
+      else if(checkpointNames.length == 0)
+        return false;
+
+      String lowestName=null;
+      int lowestIndex=Integer.MAX_VALUE;
+      for(String n: checkpointNames) {
+        int index = Integer.parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
+        if(index < lowestIndex)  {
+          lowestName = n;
+          lowestIndex = index;
+        }
+      }
+
+      checkpointNumber = lowestIndex;
+      File checkpoint = new File(checkpointDir, lowestName);
+      readAdaptorsFile(checkpoint);
+    }
+    return true;
+  }
+  private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
+      IOException
+  {
+    BufferedReader br = new BufferedReader( new InputStreamReader(new FileInputStream(checkpoint)));
+    String cmd=null;
+    while((cmd = br.readLine()) != null)
+      processCommand(cmd);
+    br.close();
+  }
+
+  /**
+   * Called periodically to write checkpoints
+   * @throws IOException
+   */
+  public void writeCheckpoint() throws IOException
+  { 
+    needNewCheckpoint = false;
+    synchronized(checkpointDir) {
+      log.info("writing checkpoint " + checkpointNumber);
+
+      FileOutputStream fos = new FileOutputStream(
+          new File(checkpointDir, CHECKPOINT_BASE_NAME + checkpointNumber));
+      PrintWriter out = new PrintWriter( new BufferedWriter(
+          new OutputStreamWriter(fos)));
+
+      for(Map.Entry<Adaptor, Offset> stat: adaptorPositions.entrySet()) {
+        try{
+          Adaptor a = stat.getKey();
+          out.print("ADD " + a.getClass().getCanonicalName());
+          out.print(" ");
+          out.print(a.getType());
+          out.print(" " + a.getCurrentStatus() + " ");
+          out.println(stat.getValue().offset);
+        }  catch(AdaptorException e)  {
+          e.printStackTrace();
+        }//don't try to recover from bad adaptor yet
+      }
+
+      out.close();
+      File lastCheckpoint =  new File(checkpointDir, CHECKPOINT_BASE_NAME + (checkpointNumber-1));
+      log.debug("hopefully removing old checkpoint file " + lastCheckpoint.getAbsolutePath());
+      lastCheckpoint.delete();
+      checkpointNumber++;
+    }
+  }
+
+  public void reportCommit(Adaptor src, long uuid)
+  {
+    needNewCheckpoint = true;
+    Offset o = adaptorPositions.get(src);
+    if(o != null) {
+      synchronized(o) { //order writes to offset, in case commits are processed out of order
+        if( uuid > o.offset)
+          o.offset = uuid;
+      }
+      
+      log.info("got commit up to " + uuid + " on " + src+ " = "+ o.id);
+    }
+    else {
+      log.warn("got commit up to " + uuid +  "  for adaptor " +src + 
+          " that doesn't appear to be running: " + adaptorsByNumber.size() + " total");
+    }
+  }
+
+  class CheckpointTask extends TimerTask  {
+    public void run()  {
+      try{
+        if(needNewCheckpoint ) {
+          writeCheckpoint();
+        }
+      } catch(IOException e)  {
+        log.warn("failed to write checkpoint", e);
+      }
+    }
+  }
+  
+//for use only by control socket.
+  Map<Long, Adaptor> getAdaptorList()  {
+    return adaptorsByNumber; 
+  }
+  /**
+   * Stop the adaptor with given ID number.
+   * Takes a parameter to indicate whether the adaptor should
+   * force out all remaining data, or just exit abruptly.
+   * 
+   * If the adaptor is written correctly, its offset won't change after returning
+   * from shutdown.
+   * 
+   * @param number the adaptor to stop
+   * @param gracefully if true, shutdown, if false, hardStop
+   * @return the number of bytes synched at stop. -1 on error
+   */
+  public long stopAdaptor(long number, boolean gracefully)  {
+    Adaptor toStop;
+    long offset = -1;
+    
+      //at most one thread can get past this critical section with toStop != null
+      //so if multiple callers try to stop the same adaptor, all but one will fail
+    synchronized(adaptorsByNumber) {
+      toStop = adaptorsByNumber.remove(number);
+    }
+    if(toStop == null) {
+      log.warn("trying to stop adaptor " + number + " that isn't running");
+      return offset;
+    }
+    try {
+      if(gracefully ) {
+
+        long bytesSentByAdaptor = toStop.shutdown(); //this can block
+        long unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset;
+        while(unstableBytes > 0 ) {
+          log.info("waiting for adaptor " + number +  "  to terminate " +
+              unstableBytes + " bytes are still uncommitted");
+          Thread.sleep(2000);
+          unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset;
+        }
+      }
+      else
+        toStop.hardStop();
+      Offset off = adaptorPositions.remove(toStop);  //next checkpoint will have the remove
+      offset = off == null ? -1 : off.offset;
+      needNewCheckpoint = true;
+
+    } catch(AdaptorException e) {
+      log.error("adaptor failed to stop cleanly", e);
+    } catch(InterruptedException e) {
+      log.error("can't wait for adaptor to finish writing", e);
+    }
+    return offset;
+  }
+  
+  protected void readConfig() {
+    Configuration conf = new Configuration();
+    String chukwaHome = System.getenv("CHUKWA_HOME");
+    if (chukwaHome == null){
+      chukwaHome = ".";
+    }
+    if(!chukwaHome.endsWith("/"))
+      chukwaHome = chukwaHome + "/";
+    
+    conf.addResource(new Path("conf/chukwa-agent-conf.xml"));
+    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name", "chukwa_checkpoint_");
+    checkpointDir= new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome+ "/var/"));
+    CHECKPOINT_INTERVAL_MS= conf.getInt("chukwaAgent.checkpoint.interval", 5000);
+    DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled", false);
+    if(DO_CHECKPOINT_RESTORE) {
+      WRITE_CHECKPOINTS = true;
+      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+    }
+  //  String initialAdaptorsStr = conf.get("initial_adaptors_file");
+    
+    initialAdaptors = new File(chukwaHome + "conf/initial_adaptors");
+  }
+  
+  public void shutdown() {
+    shutdown(false);
+  }
+
+  /**
+   * Triggers agent shutdown.
+   * For now, this method doesn't shut down adaptors explicitly.  It probably should.
+   */
+  public void shutdown(boolean exit) {
+    if(checkpointer != null)  
+      checkpointer.cancel();
+    
+    controlSock.shutdown(); //make sure we don't get new requests
+
+    try {
+      if(WRITE_CHECKPOINTS)
+        writeCheckpoint(); //write a last checkpoint here, before stopping adaptors
+    } catch(IOException e) { 
+    }
+    
+    synchronized(adaptorsByNumber) {   //shut down each adaptor
+      for(Adaptor a: adaptorsByNumber.values()) {
+        try{
+          a.hardStop();
+        }catch(AdaptorException e) {
+          log.warn("failed to cleanly stop " + a,e);
+        }
+      }
+    }
+    
+    if(exit)
+      System.exit(0);
+  }
+/**
+ *   Returns the last offset at which a given adaptor was checkpointed
+ * @param a the adaptor in question
+ * @return that adaptor's last-checkpointed offset
+ */
+  public long getOffset(Adaptor a) {
+    return adaptorPositions.get(a).offset;
+  }
+  /**
+   * Returns the control socket for this agent.
+   */
+  AgentControlSocketListener getControlSock() {
+    return controlSock;
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.*;
+//import java.util.concurrent.*;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.log4j.Logger;
+
+/**
+ * An event queue that blocks once a fixed upper limit of data is enqueued.
+ * 
+ * For now, uses the size of the data field.  Should really use estimatedSerializedSize()?
+ * 
+ */
+public class MemLimitQueue implements ChunkQueue
+{
+
+	static Logger log = Logger.getLogger(WaitingQueue.class);
+	
+	private Queue<Chunk> queue = new LinkedList<Chunk>();
+	private long dataSize = 0;
+	private final long MAX_MEM_USAGE;
+
+  public MemLimitQueue(int limit) {
+    MAX_MEM_USAGE = limit;
+  }
+	
+	/**
+	 * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
+	 */
+	public void add(Chunk event) throws InterruptedException
+	{
+	  assert event != null: "can't enqueue null chunks";
+    synchronized(this) {
+      while(event.getData().length  + dataSize > MAX_MEM_USAGE)
+        this.wait();
+      
+      dataSize += event.getData().length;
+      queue.add(event);
+      this.notifyAll();
+    }
+	 
+	}
+
+	/**
+	 * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List, int)
+	 */
+	public void collect(List<Chunk> events,int maxCount) throws InterruptedException
+	{
+		synchronized(this) {
+		  //we can't just say queue.take() here, since we're holding a lock.
+		  while(queue.isEmpty()){
+		    this.wait();
+		  }
+		  
+		  int i = 0;
+		  while(!queue.isEmpty() && (i++ < maxCount)) { 
+		    Chunk e = this.queue.remove();
+		    dataSize -= e.getData().length;
+		    events.add(e);
+		  }
+		  this.notifyAll();
+		} 
+
+		if (log.isDebugEnabled()) 	{
+			log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
+		}
+	}
+
+	public int size(){
+	  return queue.size();
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.log4j.Logger;
+
+public class WaitingQueue implements ChunkQueue
+{
+
+	static Logger log = Logger.getLogger(WaitingQueue.class);
+	private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
+	
+	public void add(Chunk event)
+	{
+	  try
+	  {
+		this.queue.put(event);
+	  }
+	  catch(InterruptedException e)
+	  {}//return upwards
+	}
+
+	public void add(List<Chunk> events)
+	{
+		this.queue.addAll(events);
+  
+	}
+
+	public void collect(List<Chunk> events,int maxCount)
+	{
+		// Workaround to block on the queue
+		try
+		{
+			events.add(this.queue.take());
+		} 
+		catch (InterruptedException e)
+		{}
+		this.queue.drainTo(events,maxCount-1);
+
+		System.out.println("collect [" + Thread.currentThread().getName() + "] [" + events.size() + "]");
+
+		if (log.isDebugEnabled())
+		{
+			log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
+		}
+	}
+	
+	 public int size(){
+	    return queue.size();
+	  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import org.mortbay.jetty.*;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.*;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+
+public class CollectorStub {
+  
+  
+  public static void main(String[] args)
+  {
+    
+    try {
+      System.out.println("usage:  CollectorStub [portno] [pretend]");
+      System.out.println("note: if no portno defined, defaults to value in chukwa-site.xml");
+ 
+      ChukwaConfiguration conf = new ChukwaConfiguration();
+      int portNum = conf.getInt("chukwaCollector.http.port", 9999);
+
+      if(args.length != 0)
+        portNum = Integer.parseInt(args[0]);
+      if(args.length > 1) {
+        if(args[1].equals("pretend"))
+          ServletCollector.setWriter(new ConsoleWriter(true));
+        else if(args[1].equals("pretend-quietly"))
+          ServletCollector.setWriter(new ConsoleWriter(false));
+        else
+          System.out.println("WARNING: don't know what to do with command line arg "+ args[1]);
+      }
+      
+      SelectChannelConnector jettyConnector = new SelectChannelConnector();
+      jettyConnector.setLowResourcesConnections(20);
+      jettyConnector.setLowResourceMaxIdleTime(1000);
+      jettyConnector.setPort(portNum);
+      Server server = new Server(portNum);
+      server.setConnectors(new Connector[]{ jettyConnector});
+      org.mortbay.thread.BoundedThreadPool pool = new  org.mortbay.thread.BoundedThreadPool();
+      pool.setMaxThreads(30);
+      server.setThreadPool(pool);
+      Context root = new Context(server,"/",Context.SESSIONS);
+      root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+      server.start();
+      server.setStopAtShutdown(false);
+     
+      System.out.println("started http collector on port number " + portNum);
+
+    }
+    catch(Exception e)
+    {
+      e.printStackTrace();
+      System.exit(0);
+    }
+
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
+import java.io.*;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.log4j.Logger;
+
+public class ServletCollector extends HttpServlet
+{
+
+  static final boolean FANCY_DIAGNOSTICS = true;
+	static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
+	 
+  private static final long serialVersionUID = 6286162898591407111L;
+  Logger log = Logger.getRootLogger();//.getLogger(ServletCollector.class);
+	  
+  
+	public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws IOException
+	{
+	  writer = w;
+	  w.init();
+	}
+  
+	public void init(ServletConfig servletConf) throws ServletException
+	{
+	  
+	  log.info("initing servletCollector");
+		if(servletConf == null)	{
+			log.fatal("no servlet config");
+			return;
+		}
+		
+		try
+		{
+			// read the application->pipeline settings from a config file in the format:
+			// appliation_name: PipelinedWriter1, PipelinedWriter2, Writer
+			// use reflection to set up the pipeline after reading in the list of writers from the config file
+			
+			/*
+			String strPipelines = "HadoopLogs:HdfsWriter\nApplication2:SameerWriter:HdfsWriter";
+			String[] pipelines = strPipelines.split("\n");
+			// split into pipes
+			for (String pipe : pipelines){
+				String[] tmp = pipe.split(":");
+				String app = tmp[0];
+				String[] stages = tmp[1].split(",");
+			
+				//loop through pipes, creating linked list of stages per pipe, one at a time 
+				for (String stage : stages){
+					Class curr = ClassLoader.loadClass(stage);
+				}
+			}
+			*/
+		      //FIXME: seems weird to initialize a static object here
+			if (writer == null)
+				writer =  new SeqFileWriter();
+
+		} catch (IOException e) {
+			throw new ServletException("Problem init-ing servlet", e);
+		}		
+	}
+
+	protected void accept(HttpServletRequest req, HttpServletResponse resp)
+			throws ServletException
+	{
+	  ServletDiagnostics diagnosticPage = new ServletDiagnostics();
+		try {
+	    
+		  final long currentTime = System.currentTimeMillis();
+		  log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
+			java.io.InputStream in = req.getInputStream();
+						
+			ServletOutputStream l_out = resp.getOutputStream();
+			final DataInputStream di = new DataInputStream(in);
+			final int numEvents = di.readInt();
+		  //	log.info("saw " + numEvents+ " in request");
+
+      if(FANCY_DIAGNOSTICS)
+        diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
+			for (int i = 0; i < numEvents; i++){
+				// TODO: pass new data to all registered stream handler methods for this chunk's stream
+				// TODO: should really have some dynamic assignment of events to writers
+
+	      ChunkImpl logEvent =  ChunkImpl.read(di);
+
+	      if(FANCY_DIAGNOSTICS)
+	        diagnosticPage.sawChunk(logEvent, i);
+	      
+				// write new data to data sync file
+				if(writer != null) {
+				  writer.add(logEvent);  //save() blocks until data is written
+				  //this is where we ACK this connection
+					l_out.print("ok:");
+					l_out.print(logEvent.getData().length);
+					l_out.print(" bytes ending at offset ");
+					l_out.println(logEvent.getSeqID()-1);
+				}
+				else
+					l_out.println("can't write: no writer");	
+			}
+
+      if(FANCY_DIAGNOSTICS)
+        diagnosticPage.doneWithPost();
+	    resp.setStatus(200);
+			
+		} catch (IOException e) 	{
+			log.warn("IO error", e);
+			throw new ServletException(e);
+		}
+	}
+
+	
+	@Override
+	protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+			throws ServletException, IOException
+	{
+		accept(req,resp);
+	}
+
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+			throws ServletException, IOException
+	{
+	  PrintStream out = new PrintStream(resp.getOutputStream());
+    resp.setStatus(200);
+	  out.println("<html><body><h2>Chukwa servlet running</h2>");
+	  if(FANCY_DIAGNOSTICS)
+	    ServletDiagnostics.printPage(out);
+	  out.println("</body></html>");
+//		accept(req,resp);
+	}
+
+  @Override	
+	public String getServletInfo()
+	{
+		return "Chukwa Servlet Collector";
+	}
+
+	@Override
+	public void destroy()
+	{
+	  synchronized(writer)
+	  {
+	    writer.close();
+	  }
+	  super.destroy();
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
+import java.io.PrintStream;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+/**
+ * One per post
+ */
+public class ServletDiagnostics {
+
+  static Logger log=  Logger.getLogger(ServletDiagnostics.class);
+  
+  private static class PostStats { //statistics about a chunk
+    public PostStats(String src, int count, long receivedTs)
+    {
+      this.count = count;
+      this.src = src;
+      this.receivedTs = receivedTs;
+      types = new String[count];
+      names = new String[count];
+      lengths = new int[count];
+      
+      seenChunkCount = 0;
+      dataSize = 0;
+    }
+    final int count;
+    final String src;
+    final long receivedTs;
+    final String[] types, names;
+    final int[] lengths;
+    
+    int seenChunkCount;
+    long dataSize;
+    public void addChunk(ChunkImpl c, int position)
+    {
+      if(position != seenChunkCount)
+        log.warn("servlet collector is passing chunk " + position + " but diagnostics has seen" +
+            seenChunkCount);
+      else if(seenChunkCount >= count){
+        log.warn("too many chunks in post declared as length " +count);
+      } else {
+        types[seenChunkCount] = c.getDataType(); 
+        lengths[seenChunkCount] = c.getData().length;
+        names[seenChunkCount] = c.getStreamName();
+        dataSize += c.getData().length;
+        ++seenChunkCount;
+      }
+    }
+  }
+  
+  static {
+    lastPosts = new LinkedList<PostStats>();
+  }
+
+  static LinkedList<PostStats> lastPosts;
+  PostStats curPost;
+  static int CHUNKS_TO_KEEP = 300;
+
+  
+  public void sawPost(String source, int chunks, long receivedTs) {
+    if(curPost != null) {
+      log.warn("should only have one HTTP post per ServletDiagnostics");
+      doneWithPost();
+    }
+    curPost = new PostStats(source, chunks, receivedTs);
+  }
+  
+  public void sawChunk(ChunkImpl c, int pos) {
+    curPost.addChunk(c, pos);
+  }
+
+  public static void printPage(PrintStream out) {
+    
+    HashMap<String, Long> bytesFromHost = new HashMap<String, Long>();    
+    long timeWindowOfSample = Long.MAX_VALUE;
+    long now = System.currentTimeMillis();
+
+
+    out.println("<ul>");
+    
+    synchronized(lastPosts) {
+      if(!lastPosts.isEmpty())
+        timeWindowOfSample = now -  lastPosts.peek().receivedTs;
+      
+      for(PostStats stats: lastPosts) {
+        out.print("<li>");
+        
+        out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs);
+        out.println(" which was " +  ((now - stats.receivedTs)/ 1000) + " seconds ago");
+        Long oldBytes = bytesFromHost.get(stats.src);
+        long newBytes = stats.dataSize;
+        if(oldBytes != null)
+          newBytes += oldBytes;
+        bytesFromHost.put(stats.src, newBytes);
+        out.println("<ol>");
+        for(int i =0; i < stats.count; ++i)
+          out.println("<li> "+ stats.lengths[i] + " bytes of type " +
+              stats.types[i] + ".  Adaptor name ="+ stats.names[i] +" </li>");
+        out.println("</ol></li>");
+      }
+    }
+    out.println("</ul>");
+    out.println("<ul>");
+    for(Map.Entry<String, Long> h: bytesFromHost.entrySet()) {
+      out.print("<li>rate from " + h.getKey() + " was " + (1000 * h.getValue() / timeWindowOfSample));
+      out.println(" bytes/second in last " + timeWindowOfSample/1000 + " seconds.</li>");
+    }
+    
+
+    out.println("</ul>");    
+    out.println("total of " + bytesFromHost.size() + " unique hosts seen");
+
+    out.println("<p>current time is " + System.currentTimeMillis() + " </p>");
+  }
+
+  public void doneWithPost() {
+    synchronized(lastPosts) {
+      if(lastPosts.size() > CHUNKS_TO_KEEP)
+        lastPosts.remove();
+      lastPosts.add(curPost);
+    }
+  }
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.connector;
+
+/**
+ * This class is responsible for setting up a long living process that repeatedly calls the 
+ * <code>send</code> function of a Sender.
+ */
+
+public interface Connector
+{
+	static final int proxyTimestampField = 0;
+	/**
+	 * 
+	 */
+	static final int proxyURIField = 1;
+	static final int proxyRetryField = 2;
+	
+	static final int adaptorTimestampField = 3;
+	static final int adaptorURIField = 4;
+
+	static final int logTimestampField = 5;
+	static final int logSourceField = 6;
+	static final int logApplicationField = 7;
+	static final int logEventField = 8;
+
+	
+	public void start();
+  public void shutdown();
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.connector.http;
+
+/**
+ * This class is responsible for setting up a {@link HttpConnectorClient} with a  collectors
+ * and then repeatedly calling its send function which encapsulates the work of setting up the
+ * connection with the appropriate collector and then collecting and sending the {@link Chunk}s 
+ * from the global {@link ChunkQueue} which where added by {@link Adaptors}. We want to separate
+ * the long living (i.e. looping) behavior from the ConnectorClient because we also want to be able
+ * to use the HttpConnectorClient for its add and send API in arbitrary applications that want to send
+ * chunks without an {@link LocalAgent} daemon.
+ * 
+ * * <p>
+ * On error, tries the list of available collectors, pauses for a minute, and then repeats.
+ * </p>
+ * <p> Will wait forever for collectors to come up. </p>
+ 
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+import org.apache.hadoop.chukwa.datacollection.sender.*;
+import org.apache.log4j.Logger;
+
+
+public class HttpConnector implements Connector, Runnable  {
+  
+	static Logger log = Logger.getLogger(HttpConnector.class);
+
+  static Timer statTimer = null;
+  static volatile int chunkCount = 0;
+  static final int MAX_EVENTS_PER_POST = 1000;
+  static final int MIN_POST_INTERVAL= 4 * 1000;
+  static ChunkQueue chunkQueue;
+  
+  ChukwaAgent agent;
+  String argDestination = null;
+  
+  private boolean stopMe = false;
+
+  static{
+    statTimer = new Timer();
+    chunkQueue = DataFactory.getInstance().getEventQueue();
+    statTimer.schedule(new TimerTask() {
+      public void run() {
+        int count = chunkCount;
+        chunkCount = 0;           
+        log.info("# http chunks ACK'ed since last report: " + count );
+      }
+    }, 100,60*1000);
+  }
+  
+	public HttpConnector(ChukwaAgent agent)	{
+		this.agent = agent;
+	}
+
+	 public HttpConnector(ChukwaAgent agent, String destination) {
+	    this.agent = agent;
+	    this.argDestination = destination;
+
+      log.info("Setting HTTP Connector URL manually using arg passed to Agent: " + destination);
+	  }
+	
+	public void start() 	{
+		(new Thread(this, "HTTP post thread")).start();
+	}
+	
+	public void shutdown(){
+	  stopMe = true;
+	}
+	
+	public void run(){
+	 	log.info("HttpConnector started at time:" + System.currentTimeMillis());
+
+	 	Iterator<String> destinations = null;
+	  
+	 	// build a list of our destinations from collectors
+	 	try{
+	    destinations = DataFactory.getInstance().getCollectors();
+	  } catch (IOException e){
+	    log.error("Failed to retreive list of collectors from conf/collectors file", e);
+	  }
+	  
+    ChukwaSender connectorClient = new ChukwaHttpSender();
+	  if (argDestination != null) {
+
+	    ArrayList<String> tmp = new ArrayList<String>();
+	    tmp.add(argDestination);
+      connectorClient.setCollectors(tmp.iterator());
+      log.info("using collector specified at agent runtime: " + argDestination);
+    } else if (destinations != null && destinations.hasNext()) {
+      connectorClient.setCollectors(destinations);
+      log.info("using collectors from collectors file");
+	  } else {
+	    log.error("No collectors specified, exiting (and taking agent with us).");
+	    agent.shutdown(true);//error is unrecoverable, so stop hard.
+	    return;
+	  }
+	  
+	  try {
+	    long lastPost = System.currentTimeMillis();
+  	  while(!stopMe) {
+  	    List<Chunk> newQueue = new ArrayList<Chunk>();
+  	    try {
+  	      //get all ready chunks from the chunkQueue to be sent
+  	      chunkQueue.collect(newQueue,MAX_EVENTS_PER_POST); //FIXME: should really do this by size
+  	     
+  	    } catch(InterruptedException e) {
+  	      System.out.println("thread interrupted during addChunks(ChunkQueue)");
+  	      Thread.currentThread().interrupt();
+  	      break;
+  	    }
+  	    int toSend = newQueue.size();
+  	    List<ChukwaHttpSender.CommitListEntry> results = connectorClient.send(newQueue);
+  	    log.info("sent " +toSend + " chunks, got back " + results.size() + " acks");
+  	    //checkpoint the chunks which were committed
+  	    for(ChukwaHttpSender.CommitListEntry cle : results) {
+          agent.reportCommit(cle.adaptor, cle.uuid);
+          chunkCount++;
+        }
+  	    long now = System.currentTimeMillis();
+  	    if( now - lastPost < MIN_POST_INTERVAL )  
+  	      Thread.sleep(now - lastPost);  //wait for stuff to accumulate
+        lastPost = now;
+  	  } //end of try forever loop
+  	  log.info("received stop() command so exiting run() loop to shutdown connector");
+  	} catch(InterruptedException e) {
+	  //do nothing, let thread die.
+  	}catch(java.io.IOException e) {
+  	  log.error("connector failed; shutting down agent");
+  	  agent.shutdown(true);
+    }
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.controller;
+
+
+import java.net.*;
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+
+/**
+ * A convenience library for applications to communicate to the {@link ChukwaAgent}. Can be used
+ * to register and unregister new {@link Adaptor}s. Also contains functions for applications to
+ * use for handling log rations.  
+ */
+public class ChukwaAgentController {
+  
+  public class AddAdaptorTask extends TimerTask {
+    String adaptorName;
+    String type;
+    String params;
+    long offset;
+    long numRetries;
+    long retryInterval;
+    
+    AddAdaptorTask(String adaptorName, String type, String params,
+        long offset, long numRetries, long retryInterval){
+      this.adaptorName = adaptorName;
+      this.type = type;
+      this.params = params;
+      this.offset = offset;
+      this.numRetries = numRetries;
+      this.retryInterval = retryInterval;
+    }
+    @Override
+    public void run() {
+      add(adaptorName, type, params, offset, numRetries, retryInterval);
+    }
+  }
+
+  //our default adaptors, provided here for convenience
+  public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
+  public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
+  
+    
+  static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
+  static int DEFAULT_PORT = 9093;
+  static String DEFAULT_HOST = "localhost";
+  static int numArgs = 0;
+  
+  class Adaptor{
+    public long id = -1;    
+    final public String name;
+    final public String params;
+    final public String appType;
+    public long offset; 
+
+    
+    Adaptor(String adaptorName, String appType, String params, long offset){
+      this.name = adaptorName;
+      this.appType = appType;
+      this.params = params;
+      this.offset = offset;
+    }
+    
+    Adaptor(long id, String adaptorName, String appType, String params, long offset){
+      this.id = id;
+      this.name = adaptorName;
+      this.appType = appType;
+      this.params = params;
+      this.offset = offset;
+    }
+    
+    /**
+     * Registers this {@link Adaptor} with the agent running at the specified hostname and portno  
+     * @return The id number of the this {@link Adaptor}, assigned by the agent upon successful registration
+     * @throws IOException
+     */
+    long register() throws IOException{
+      Socket s = new Socket(hostname, portno);
+      PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+      bw.println("ADD " + name + " " + appType + " " + params + " " + offset);
+      bw.flush();
+      BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+      String resp = br.readLine();
+      if(resp != null){
+        String[] fields = resp.split(" ");
+        if(fields[0].equals("OK")){
+          try{
+            id = Long.parseLong(fields[fields.length -1]);
+          }
+          catch (NumberFormatException e){}
+        }
+      }
+      s.close();
+      return id;
+    }
+    
+    void unregister() throws IOException{
+      Socket s = new Socket(hostname, portno);
+      PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+      bw.println("SHUTDOWN " + id);
+      bw.flush();
+
+      BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+      String resp = br.readLine();
+      if( resp == null || !resp.startsWith("OK"))
+      {
+        //error.  What do we do?
+      } else if (resp.startsWith("OK")){
+        String[] respSplit = resp.split(" ");
+        String newOffset = respSplit[respSplit.length-1];
+        try {
+          offset = Long.parseLong(newOffset);
+        }catch (NumberFormatException nfe){
+          System.err.println("adaptor didn't shutdown gracefully.\n" + nfe);
+        }
+      }
+      
+      s.close();
+    }
+    
+    public String toString(){
+      String[] namePieces = name.split("\\.");
+      String shortName = namePieces[namePieces.length-1];
+      return id + " " + shortName + " " + appType + " " + params + " " + offset; 
+    }
+  }
+  
+  Map<Long, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<Long,Adaptor>();
+  Map<Long, ChukwaAgentController.Adaptor> pausedAdaptors;
+  String hostname;
+  int portno;
+  private Timer addFileTimer = new Timer();
+  
+  public ChukwaAgentController(){
+    portno = DEFAULT_PORT;
+    hostname = DEFAULT_HOST;
+    pausedAdaptors = new HashMap<Long,Adaptor>();
+    
+    syncWithAgent();
+  }
+  
+  public ChukwaAgentController(String hostname, int portno)
+  {
+    this.hostname = hostname;
+    this.portno = portno;
+    pausedAdaptors = new HashMap<Long,Adaptor>();
+
+    syncWithAgent();
+  }
+
+  private boolean syncWithAgent() {
+    //set up adaptors by using list here
+    try{
+      runningAdaptors = list();
+      return true;
+    }catch(IOException e){
+      System.err.println("Error initializing ChukwaClient with list of " +
+          "currently registered adaptors, clearing our local list of adaptors");
+      //e.printStackTrace();
+      //if we can't connect to the LocalAgent, reset/clear our local view of the Adaptors.
+      runningAdaptors = new HashMap<Long,ChukwaAgentController.Adaptor>();
+      return false;
+    }
+  }
+  
+  /**
+   * Registers a new adaptor. Makes no guarantee about success. On failure,
+   * we print a message to stderr and ignore silently so that an application
+   * doesn't crash if it's attempt to register an adaptor fails. This call does
+   * not retry a conection. for that use the overloaded version of this which
+   * accepts a time interval and number of retries
+   * @return the id number of the adaptor, generated by the agent
+   */
+   public long add(String adaptorName, String type, String params, long offset){
+     return add(adaptorName, type, params, offset, 20, 15* 1000);//retry for five minutes, every fifteen seconds
+   }
+   
+   /**
+    * Registers a new adaptor. Makes no guarantee about success. On failure,
+    * to connect to server, will retry <code>numRetries</code> times, every
+    * <code>retryInterval</code> milliseconds.
+    * @return the id number of the adaptor, generated by the agent
+    */
+   public long add(String adaptorName, String type, String params, long offset, long numRetries, long retryInterval){
+     ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(adaptorName, type, params, offset);
+     long adaptorID = -1;
+     if (numRetries >= 0){
+       try{
+         adaptorID = adaptor.register();
+
+         if (adaptorID > 0){
+           runningAdaptors.put(adaptorID,adaptor);
+         }
+         else{
+           System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
+         }
+       }catch(IOException ioe){
+         System.out.println("AgentClient failed to contact the agent (" + hostname + ":" + portno + ")");
+         System.out.println("Scheduling a agent connection retry for adaptor add() in another " +
+             retryInterval + " milliseconds, " + numRetries + " retries remaining");
+         
+         addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params, offset, numRetries-1, retryInterval), retryInterval);
+       }
+     }else{
+       System.err.println("Giving up on connecting to the local agent");
+     }
+     return adaptorID;
+   } 
+
+   public synchronized ChukwaAgentController.Adaptor remove(long adaptorID) throws IOException
+   {
+     syncWithAgent();
+     ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
+     a.unregister();
+     return a;
+     
+   }
+   
+   public void remove(String className, String appType, String filename) throws IOException
+   {
+     syncWithAgent();
+     // search for FileTail adaptor with string of this file name
+     // get its id, tell it to unregister itself with the agent,
+     // then remove it from the list of adaptors
+     for (Adaptor a : runningAdaptors.values()){
+       if (a.name.equals(className) && a.params.equals(filename) && a.appType.equals(appType)){
+         remove(a.id);
+       }
+     }
+   }
+   
+   
+   public void removeAll(){
+     syncWithAgent();
+     Long[] keyset = runningAdaptors.keySet().toArray(new Long[]{});
+
+     for (long id : keyset){
+       try {
+         remove(id);
+       }catch(IOException ioe){
+         System.err.println("Error removing an adaptor in removeAll()");
+         ioe.printStackTrace();
+       }
+       System.out.println("Successfully removed adaptor " + id);
+     }
+   }
+   
+   Map<Long,ChukwaAgentController.Adaptor> list() throws IOException
+   {  
+     Socket s = new Socket(hostname, portno);
+     PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+   
+     bw.println("LIST");
+     bw.flush();
+     BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+     String ln;
+     Map<Long,Adaptor> listResult = new HashMap<Long,Adaptor>();
+     while((ln = br.readLine())!= null)
+     {
+       if (ln.equals("")){
+         break;
+       }else{
+         String[] parts = ln.split("\\s+");
+         if (parts.length >= 4){ //should have id, className appType, params, offset
+           long id = Long.parseLong(parts[0].substring(0,parts[0].length()-1)); //chop off the right-paren
+           long offset = Long.parseLong(parts[parts.length-1]);
+           String tmpParams = parts[3];
+           for (int i = 4; i<parts.length-1; i++){
+             tmpParams += " " + parts[i];
+           }
+           listResult.put(id, new Adaptor(id,parts[1],parts[2],tmpParams, offset));
+         }
+       }
+     }
+     s.close();
+     return listResult;
+   }
+   
+   //************************************************************************
+   // The following functions are convenience functions, defining an easy
+   // to use API for application developers to integrate chukwa into their app
+   //************************************************************************
+   
+   /**
+    * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0. Checks to
+    * see if the file is being watched already, if so, won't register another adaptor
+    * with the agent. If you have run the tail adaptor on this file before and rotated 
+    * or emptied the file you should use {@link ChukwaAgentController#pauseFile(String, String)}
+    * and {@link ChukwaAgentController#resumeFile(String, String)} which will store the adaptors 
+    * metadata and re-use them to pick up where it left off.
+    * @param type the datatype associated with the file to pass through
+    * @param filename of the file for the tail adaptor to start monitoring
+    * @return the id number of the adaptor, generated by the agent
+    */
+  public long addFile(String appType, String filename, long numRetries, long retryInterval)
+  {
+    filename = new File(filename).getAbsolutePath();
+    //TODO: Mabye we want to check to see if the file exists here?
+    //      Probably not because they might be talking to an agent on a different machine?
+    
+    //check to see if this file is being watched already, if yes don't set up another adaptor for it
+    boolean isDuplicate = false;
+    for (Adaptor a : runningAdaptors.values()){
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType) && a.params.endsWith(filename)){
+        isDuplicate = true;
+      }
+    }
+    if (!isDuplicate){
+      return add(DEFAULT_FILE_TAILER, appType, 0L + " " + filename,0L, numRetries, retryInterval);
+    }
+    else{
+      System.out.println("An adaptor for filename \"" + filename + "\", type \""
+          + appType + "\", exists already, addFile() command aborted");
+      return -1;
+    }
+  }
+  
+  public long addFile(String appType, String filename){
+    return addFile(appType, filename, 0, 0);
+  }
+ 
+  /**
+   * Pause all active adaptors of the default file tailing type who are tailing this file
+   * This means we actually stop the adaptor and it goes away forever, but we store it
+   * state so that we can re-launch a new adaptor with the same state later.
+   * @param appType
+   * @param filename
+   * @return array of adaptorID numbers which have been created and assigned the state of the formerly paused adaptors 
+   * @throws IOException
+   */
+  public Collection<Long> pauseFile(String appType, String filename) throws IOException{
+    syncWithAgent();
+    //store the unique streamid of the file we are pausing.
+    //search the list of adaptors for this filename
+    //store the current offset for it
+    List<Long> results = new ArrayList<Long>();
+    for (Adaptor a : runningAdaptors.values()){
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+        pausedAdaptors.put(a.id,a); //add it to our list of paused adaptors
+        remove(a.id); //tell the agent to remove/unregister it
+        results.add(a.id);
+      }
+    }
+    return results;
+  }
+  
+  public boolean isFilePaused(String appType, String filename){
+    for (Adaptor a : pausedAdaptors.values()){
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  /**
+   * Resume all adaptors for this filename that have been paused
+   * @param appType the appType
+   * @param filename filename by which to lookup adaptors which are paused (and tailing this file)
+   * @return an array of the new adaptor ID numbers which have resumed where the old adaptors left off
+   * @throws IOException
+   */
+  public Collection<Long> resumeFile(String appType, String filename) throws IOException{
+    syncWithAgent();
+    //search for a record of this paused file
+    List<Long> results = new ArrayList<Long>();
+    for (Adaptor a : pausedAdaptors.values()){
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+        long newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " " + filename, a.offset);
+        pausedAdaptors.remove(a.id);
+        a.id = newID;
+        results.add(a.id);
+      }
+    }
+    return results;
+  }
+  
+  
+  public void removeFile(String appType, String filename) throws IOException
+  {
+    syncWithAgent();
+    // search for FileTail adaptor with string of this file name
+    // get its id, tell it to unregister itself with the agent,
+    // then remove it from the list of adaptors
+    for (Adaptor a : runningAdaptors.values()){
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+        remove(a.id);
+      }
+    }
+  }
+  
+  //************************************************************************
+  // command line utilities
+  //************************************************************************
+  
+  public static void main(String[] args)
+  {
+    ChukwaAgentController c = getClient(args);
+    if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")){
+      doAddFile(c, args[1], args[2]);
+    }
+    else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")){
+      doRemoveFile(c, args[1], args[2]);
+    }
+    else if(numArgs >= 1 && args[0].toLowerCase().equals("list")){
+      doList(c);
+    }
+    else if(numArgs >= 1 && args[0].equalsIgnoreCase("removeall")){
+      doRemoveAll(c);
+    }
+    else{
+      System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
+      System.err.println("       ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
+      System.err.println("       ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
+      System.err.println("       ChukwaClient list [IP] [port]");
+      System.err.println("       ChukwaClient removeAll [IP] [port]");
+    }
+  }
+  
+  private static ChukwaAgentController getClient(String[] args){
+    int portno = 9093;
+    String hostname = "localhost";
+
+    numArgs = args.length;
+    
+    for (int i = 0; i < args.length; i++){
+      if(args[i].equals("-h") && args.length > i + 1){
+        hostname = args[i+1];
+        System.out.println ("Setting hostname to: " + hostname);
+        numArgs -= 2; //subtract for the flag and value
+      }
+      else if (args[i].equals("-p") && args.length > i + 1){
+        portno = Integer.parseInt(args[i+1]);
+        System.out.println ("Setting portno to: " + portno);
+        numArgs -= 2; //subtract for the flat, i.e. -p, and value
+      }
+    }
+    return new ChukwaAgentController(hostname, portno);
+  }
+  
+  private static long doAddFile(ChukwaAgentController c, String appType, String params){
+    System.out.println("Adding adaptor with filename: " + params);
+    long adaptorID = c.addFile(appType, params);
+    if (adaptorID != -1){
+      System.out.println("Successfully added adaptor, id is:" + adaptorID);
+    }else{
+      System.err.println("Agent reported failure to add adaptor, adaptor id returned was:" + adaptorID);
+    }
+    return adaptorID;
+  }
+  
+  private static void doRemoveFile(ChukwaAgentController c, String appType, String params){
+    try{
+      System.out.println("Removing adaptor with filename: " + params);
+      c.removeFile(appType,params);
+          }
+    catch(IOException e)
+    {
+      e.printStackTrace();
+    }
+  }
+  
+  private static void doList(ChukwaAgentController c){
+    try{
+      Iterator<Adaptor> adptrs = c.list().values().iterator();
+      while (adptrs.hasNext()){
+        System.out.println(adptrs.next().toString());
+      }
+    } catch(Exception e){
+      e.printStackTrace();
+    }
+  }
+  
+  private static void doRemoveAll(ChukwaAgentController c){
+    System.out.println("Removing all adaptors");
+    c.removeAll();
+  }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.protocol;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+public interface Protocol
+{
+	public byte[] toByteArray(List<Chunk> chunks);
+	public List<Chunk> parseFrom(byte[] bytes);
+	
+	void writeTo(OutputStream output);
+	List<Chunk> parseFrom(InputStream input);
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.sender;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpException;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.HttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.log4j.Logger;
+
+/**
+ * Encapsulates all of the http setup and connection details needed for
+ * chunks to be delivered to a collector.
+ * <p>
+ * On error, tries the list of available collectors, pauses for a minute, and then repeats.
+ * </p>
+ * <p> Will wait forever for collectors to come up. </p>
+ */
+public class ChukwaHttpSender implements ChukwaSender{
+  static final int MAX_RETRIES_PER_COLLECTOR = 4; //fast retries, in http client
+  static final int SENDER_RETRIES = 3; 
+  static final int WAIT_FOR_COLLECTOR_REBOOT = 20 * 1000; 
+    //FIXME: this should really correspond to the timer in RetryListOfCollectors
+  
+  static Logger log = Logger.getLogger(ChukwaHttpSender.class);
+  static HttpClient client = null;
+  static MultiThreadedHttpConnectionManager connectionManager = null;
+  static String currCollector = null;
+
+  
+  protected Iterator<String> collectors;
+  
+  static
+  {
+    connectionManager = 
+          new MultiThreadedHttpConnectionManager();
+    client = new HttpClient(connectionManager);
+    connectionManager.closeIdleConnections(1000);
+  }
+  
+  public static class CommitListEntry {
+    public Adaptor adaptor;
+    public long uuid;
+    
+    public CommitListEntry(Adaptor a, long uuid)  {
+      adaptor = a;
+      this.uuid = uuid;
+    }
+  }
+  
+//FIXME: probably we're better off with an EventListRequestEntity
+  static class BuffersRequestEntity implements RequestEntity {
+    List<DataOutputBuffer> buffers;
+    
+    public BuffersRequestEntity(List<DataOutputBuffer> buf) {
+      buffers=buf;
+    }
+
+    public long getContentLength()  {
+      long len=4;//first we send post length, then buffers
+      for(DataOutputBuffer b: buffers)
+        len += b.getLength();
+      return len;
+    }
+
+    public String getContentType() {
+      return "application/octet-stream";
+    }
+
+    public boolean isRepeatable()  {
+      return true;
+    }
+
+    public void writeRequest(OutputStream out) throws IOException  {
+      DataOutputStream dos = new DataOutputStream(out);
+      dos.writeInt(buffers.size());
+      for(DataOutputBuffer b: buffers)
+        dos.write(b.getData(), 0, b.getLength());
+    }
+  }
+
+  public ChukwaHttpSender(){
+    //setup default collector
+    ArrayList<String> tmp = new ArrayList<String>();
+    this.collectors = tmp.iterator();
+    currCollector = "http://localhost:8080";
+    log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
+
+  }
+  
+  /**
+   * Set up a single connector for this client to send {@link Chunk}s to
+   * @param collector the url of the collector
+   */
+  public void setCollectors(String collector){
+   }
+  
+  /**
+   * Set up a list of connectors for this client to send {@link Chunk}s to
+   * @param collectors
+   */
+  public void setCollectors(Iterator<String> collectors){
+    this.collectors = collectors; 
+    //setup a new destination from our list of collectors if one hasn't been set up
+    if (currCollector == null){
+      if (collectors.hasNext()){
+        currCollector = collectors.next();
+      }
+      else
+        log.error("No collectors to try in send(), not even trying to do doPost()");
+    }
+  }
+  
+  
+  /**
+   * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
+   * their size, sets them up 
+   * @return array of chunk id's which were ACKed by collector
+   */
+  public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
+    List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
+    List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
+    
+    log.info("collected " + toSend.size() + " chunks");
+
+    //Serialize each chunk in turn into it's own DataOutputBuffer and add that buffer to serializedEvents  
+    for(Chunk c: toSend) {
+      DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
+      try {
+        c.write(b);
+      }catch(IOException err) {
+        log.error("serialization threw IOException", err);
+      }
+      serializedEvents.add(b);
+      //store a CLE for this chunk which we will use to ack this chunk to the caller of send()
+      //(e.g. the agent will use the list of CLE's for checkpointing)
+      commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID()));
+    }
+    toSend.clear();
+    
+    //collect all serialized chunks into a single buffer to send
+    RequestEntity postData = new BuffersRequestEntity(serializedEvents);
+
+
+    int retries = SENDER_RETRIES; 
+    while(currCollector != null)
+    {
+      //need to pick a destination here
+      PostMethod method = new PostMethod();
+      try   {
+        doPost(method, postData, currCollector);
+
+        retries = SENDER_RETRIES; //reset count on success
+        //if no exception was thrown from doPost, ACK that these chunks were sent
+        return commitResults;
+      } catch (Throwable e) {
+        log.error("Http post exception", e);
+        log.info("Checking list of collectors to see if another collector has been specified for rollover");
+        if (collectors.hasNext()){
+          currCollector = collectors.next();
+          log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
+        } else {
+          if(retries > 0) {
+            log.warn("No more collectors to try rolling over to; waiting " + WAIT_FOR_COLLECTOR_REBOOT +
+                " ms (" + retries + "retries left)");
+            Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
+            retries --;
+          } else {
+            log.error("No more collectors to try rolling over to; aborting");
+            throw new IOException("no collectors");
+          }
+        }
+      }
+      finally  {
+        // be sure the connection is released back to the connection manager
+        method.releaseConnection();
+      }
+    } //end retry loop
+    return new ArrayList<CommitListEntry>();
+  }
+  
+  /**
+   * Handles the HTTP post. Throws HttpException on failure
+   */
+  @SuppressWarnings("deprecation")
+  private void doPost(PostMethod method, RequestEntity data, String dest)
+      throws IOException, HttpException
+  {
+    
+    HttpMethodParams pars = method.getParams();
+    pars.setParameter (HttpMethodParams.RETRY_HANDLER, (Object) new HttpMethodRetryHandler()
+    {
+      public boolean retryMethod(HttpMethod m, IOException e, int exec)
+      {
+        return !(e instanceof java.net.ConnectException) && (exec < MAX_RETRIES_PER_COLLECTOR);
+      }
+    });
+    method.setParams(pars);
+    method.setPath(dest);
+    
+     //send it across the network
+    method.setRequestEntity(data);
+    
+    log.info("HTTP post to " + dest+" length = "+ data.getContentLength());
+    // Send POST request
+    
+    client.setTimeout(8000);
+    int statusCode = client.executeMethod(method);
+      
+    if (statusCode != HttpStatus.SC_OK)  {
+      log.error("HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine());
+      //do something aggressive here
+      throw new HttpException("got back a failure from server");
+    }
+    //implicitly "else"
+    log.info("got success back from the remote collector; response length "+ method.getResponseContentLength());
+
+      //FIXME: should parse acks here
+    InputStream rstream = null;
+    
+    // Get the response body
+    byte[] resp_buf = method.getResponseBody();
+    rstream = new ByteArrayInputStream(resp_buf); 
+    BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
+    String line;
+    while ((line = br.readLine()) != null) {
+      System.out.println("response: " + line);
+    }
+  }
+  
+  public static void main(String[] argv) throws InterruptedException{
+    //HttpConnectorClient cc = new HttpConnectorClient();
+    //do something smarter than to hide record headaches, like force them to create and add records to a chunk
+    //cc.addChunk("test-source", "test-streamName", "test-application", "test-dataType", new byte[]{1,2,3,4,5});
+  }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,25 @@
+package org.apache.hadoop.chukwa.datacollection.sender;
+
+/**
+ * Encapsulates all of the communication overhead needed for chunks to be delivered
+ * to a collector.
+ */
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
+
+public interface ChukwaSender {
+
+  /**
+   * 
+   * @param chunksToSend a list of chunks to commit
+   * @return the list of committed chunks
+   * @throws InterruptedException if interrupted while trying to send 
+   */
+  public List<CommitListEntry> send(List<Chunk> chunksToSend) throws InterruptedException, java.io.IOException;
+  
+  public void setCollectors(Iterator<String> collectors);
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.sender;
+
+import java.io.*;
+import java.net.URL;
+import java.util.*;
+
+/***
+ * An iterator returning a list of Collectors to try.
+ * This class is nondeterministic, since it puts collectors back on the list after some period.
+ * 
+ * No node will be polled more than once per maxRetryRateMs milliseconds. hasNext() will continue return
+ * true if you have not called it recently.
+ *
+ */
+public class RetryListOfCollectors implements Iterator<String> {
+
+  int maxRetryRateMs;
+  List<String> collectors;
+  long lastLookAtFirstNode;
+  int nextCollector=0;
+  
+
+  public RetryListOfCollectors(File collectorFile, int maxRetryRateMs) throws IOException {
+    this.maxRetryRateMs = maxRetryRateMs;
+    lastLookAtFirstNode = 0;
+    collectors = new ArrayList<String>();
+    
+    try{
+      BufferedReader br  = new BufferedReader(new FileReader(collectorFile));
+      String line;
+      while((line = br.readLine()) != null) {
+        if(!line.contains("://")) //no protocol, assume http
+          collectors.add("http://"+line);
+        else
+          collectors.add(line);
+      }
+      br.close();
+    }catch(FileNotFoundException e){
+      System.err.println("Error in RetryListOfCollectors() opening file conf/connectors file from agent, double check that you have set the CHUKWA_HOME environment variable. Also, ensure file exists and is in classpath");
+    }catch(IOException e){
+      System.err.println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
+      throw e;
+    }
+    shuffleList();
+  }
+  
+  public RetryListOfCollectors(final List<String> collectors, int maxRetryRateMs) {
+    this.maxRetryRateMs = maxRetryRateMs;
+    lastLookAtFirstNode = 0;
+    this.collectors = new ArrayList<String>();
+    this.collectors.addAll(collectors);
+    shuffleList();
+  }
+  
+  //for now, use a simple O(n^2) algorithm.
+  //safe, because we only do this once, and on smalls list
+  private void shuffleList() {
+   ArrayList<String> newList = new  ArrayList<String>();
+    Random r = new java.util.Random();
+    while(!collectors.isEmpty()) {
+      int toRemove = r.nextInt(collectors.size());
+      String next = collectors.remove(toRemove);
+      newList.add(next);
+    }
+    collectors = newList;
+  }
+  
+  public boolean hasNext() {
+    return collectors.size() > 0 && 
+      ( (nextCollector  != 0)  || 
+         (System.currentTimeMillis() - lastLookAtFirstNode > maxRetryRateMs ));
+   }
+
+  public String next() {
+    if(hasNext())  {
+      int currCollector = nextCollector;
+      nextCollector = (nextCollector +1)% collectors.size();
+      if(currCollector == 0)
+        lastLookAtFirstNode = System.currentTimeMillis();
+      return collectors.get(currCollector);
+    }
+    else
+      return null;
+  }
+  
+  public String getRandomCollector(){
+    return collectors.get( (int)java.lang.Math.random() * collectors.size());
+  }
+  
+  public void add(URL collector){
+    collectors.add(collector.toString());
+  }
+
+  public void remove()  {
+    throw new UnsupportedOperationException();
+    //FIXME: maybe just remove a collector from our list and then 
+    //FIXME: make sure next doesn't break (i.e. reset nextCollector if necessary)
+  }
+
+  /**
+   *  
+   * @return total number of collectors in list
+   */
+  int total() {
+    return collectors.size();
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.test;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.*;
+import org.apache.hadoop.chukwa.datacollection.agent.*;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+
+import java.util.*;
+
+/**
+ * Output events to stdout.
+ * Intended for debugging use.
+ *
+ */
+public class ConsoleOutConnector extends Thread implements Connector {
+  
+  final ChukwaAgent agent;
+  volatile boolean shutdown;
+  final boolean silent;
+  
+
+  public ConsoleOutConnector(ChukwaAgent a) {
+    this(a, false);
+  }
+  
+  public ConsoleOutConnector(ChukwaAgent a, boolean silent)
+  {
+    agent = a;
+    this.silent = silent;
+  }
+  
+  public void run()
+  {
+    try{
+      System.out.println("console connector started");
+      ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
+      if(!silent)
+        System.out.println("-------------------");
+      
+      while(!shutdown)
+      {
+        List<Chunk> evts = new ArrayList<Chunk>();
+        eventQueue.collect(evts, 1);
+        
+        for(Chunk e: evts)
+        {
+          if(!silent) {
+            System.out.println("Console out connector got event at offset " + e.getSeqID());
+            System.out.println("data type was " + e.getDataType());
+            if(e.getData().length > 1000)
+              System.out.println("data length was " + e.getData().length+ ", not printing");
+            else
+              System.out.println(new String(e.getData()));
+          }
+          
+          agent.reportCommit(e.getInitiator(), e.getSeqID());
+         
+          if(!silent)
+            System.out.println("-------------------");
+        }
+      }
+    }
+    catch(InterruptedException e)
+    {} //thread is about to exit anyway
+  }
+
+  public void shutdown()
+  {
+    shutdown = true;
+    this.interrupt();
+  }
+
+}



Mime
View raw message