hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r723855 [5/23] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/lib...
Date Fri, 05 Dec 2008 20:30:21 GMT
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Fri Dec  5 12:30:14 2008
@@ -23,6 +23,9 @@
 
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.log.Log;
 
 /**
  * A shared thread used by all FileTailingAdaptors. 
@@ -40,21 +43,32 @@
   /**
    * How often to tail each file.
    */
-  int SAMPLE_PERIOD_MS = 1000* 2; //FIXME: should be configurable
+  int DEFAULT_SAMPLE_PERIOD_MS = 1000* 2;
+  int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
+  private static Configuration conf = null;
   
   FileTailer() {
-     eq = DataFactory.getInstance().getEventQueue();
+	if (conf == null) {
+	  ChukwaAgent agent = ChukwaAgent.getAgent();
+	  if (agent != null) {
+		conf = agent.getConfiguration();
+    	if (conf != null) {
+    	  SAMPLE_PERIOD_MS= conf.getInt("chukwaAgent.adaptor.context.switch.time", DEFAULT_SAMPLE_PERIOD_MS);
+        }
+	  }
+	}
+    eq = DataFactory.getInstance().getEventQueue();
      
-       //iterations are much more common than adding a new adaptor
-     adaptors = new CopyOnWriteArrayList<FileTailingAdaptor>();
+    //iterations are much more common than adding a new adaptor
+    adaptors = new CopyOnWriteArrayList<FileTailingAdaptor>();
 
-     this.setDaemon(true);
-     start();//start the file-tailing thread
+    this.setDaemon(true);
+    start();//start the file-tailing thread
   }
    
   //called by FileTailingAdaptor, only
    void startWatchingFile(FileTailingAdaptor f) {
-       adaptors.add(f);
+     adaptors.add(f);
    }
 
    //called by FileTailingAdaptor, only
@@ -64,7 +78,7 @@
    
   public void run()  {
     try{
-      while(true) {
+      while(true) {    	  
         boolean shouldISleep = true;
         long startTime = System.currentTimeMillis();
         for(FileTailingAdaptor f: adaptors) {
@@ -72,13 +86,12 @@
           shouldISleep &= !hasMoreData;
         }
         long timeToReadFiles = System.currentTimeMillis() - startTime;
-        assert timeToReadFiles >= 0 : " time shouldn't go backwards";
-        if(timeToReadFiles < SAMPLE_PERIOD_MS && shouldISleep)
-          Thread.sleep(SAMPLE_PERIOD_MS - timeToReadFiles+1);
+        if(timeToReadFiles < SAMPLE_PERIOD_MS || shouldISleep) {
+          Thread.sleep(SAMPLE_PERIOD_MS);
+        }
       }
+    } catch(InterruptedException e) {
     }
-    catch(InterruptedException e)
-    {}
   }
   
   

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Fri Dec  5 12:30:14 2008
@@ -22,9 +22,13 @@
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
 import java.io.*;
+import java.util.Timer;
 
 /**
  * An adaptor that repeatedly tails a specified file, sending the new bytes.
@@ -42,8 +46,12 @@
 	 * to the next. This way, we get quick response time for other files if one
 	 * file is growing rapidly.
 	 */
-	public static final int MAX_READ_SIZE = 128 * 1024;
-
+	public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ;
+	public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ;
+	public static int MAX_RETRIES = 300;
+	protected static Configuration conf = null;
+	private int attempts = 0;
+	
 	File toWatch;
 	/**
 	 * next PHYSICAL offset to read
@@ -51,6 +59,8 @@
 	protected long fileReadOffset;
 	protected String type;
 	private ChunkReceiver dest;
+	protected RandomAccessFile reader = null;
+	protected long adaptorID;
 	
 	/**
 	 * The logical offset of the first byte of the file
@@ -64,21 +74,24 @@
 		log =Logger.getLogger(FileTailingAdaptor.class);
 	}
 
-	public void start(String type, String params, long bytes, ChunkReceiver dest) {
-	  //in this case params = filename 
+	public void start(long adaptorID, String type, String params, long bytes, ChunkReceiver dest) {
+	    //in this case params = filename 
 		log.info("started file tailer on file " + params);
-	  this.type = type;
-	  this.dest = dest;
+		this.adaptorID = adaptorID;
+	    this.type = type;
+	    this.dest = dest;
+	    this.attempts = 0;
 			  
-	  String[] words = params.split(" ");
-	  if(words.length > 1) {
-	    offsetOfFirstByte = Long.parseLong(words[0]);
-	    toWatch = new File(params.substring(words[0].length() + 1));
-	  }
-	  else
-	    toWatch = new File(params);
+	    String[] words = params.split(" ");
+	    if(words.length > 1) {
+	        offsetOfFirstByte = Long.parseLong(words[0]);
+	        toWatch = new File(params.substring(words[0].length() + 1));
+	    } else {
+	        toWatch = new File(params);
+	    }
 	  
-		this.fileReadOffset= bytes - offsetOfFirstByte;
+	    
+		this.fileReadOffset= bytes;
 		tailer.startWatchingFile(this);
 	}
 
@@ -88,25 +101,38 @@
 	 */
 	public long shutdown() throws AdaptorException {
 	  try{
-	    tailFile(tailer.eq); // get tail end of file.
-	  } catch(InterruptedException e) {
-	    Thread.currentThread().interrupt();
+	    if(toWatch.exists()) {
+	    	int retry=0;
+	    	tailer.stopWatchingFile(this);
+			TerminatorThread lastTail = new TerminatorThread(this,tailer.eq);
+			lastTail.setDaemon(true);
+			lastTail.start();
+			while(lastTail.isAlive() && retry < 60) {
+				try {
+					log.info("Retry:"+retry);
+				    Thread.currentThread().sleep(1000);
+				    retry++;
+				} catch(InterruptedException ex) {
+				}
+			}
+	    }
+	  } finally {
+	    return fileReadOffset + offsetOfFirstByte;
 	  }
-		hardStop();//need to do this afterwards, so that offset stays visible during tailFile().
-		return fileReadOffset + offsetOfFirstByte;
+
 	}
 	/**
 	 * Stop tailing the file, effective immediately.
 	 */
 	public void hardStop() throws AdaptorException {
-    tailer.stopWatchingFile(this);
+        tailer.stopWatchingFile(this);
 	}
 
   /**
    * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
    */
 	public String getCurrentStatus() {
-		return type + " " + offsetOfFirstByte+ " " + toWatch.getPath();
+		return type.trim() + " " + offsetOfFirstByte+ " " + toWatch.getPath() + " " + fileReadOffset;
 		// can make this more efficient using a StringBuilder
 	}
 
@@ -114,6 +140,10 @@
 		return "Tailer on " + toWatch;
 	}
 
+	public String getStreamName() {
+		return toWatch.getPath();
+	}
+	
 	/**
 	 * Looks at the tail of the associated file, adds some of it to event queue
 	 * This method is not thread safe. Returns true if there's more data in the
@@ -123,35 +153,119 @@
 	 */
 	public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
     boolean hasMoreData = false;
-    try {
-      if(!toWatch.exists())
-        return false;  //no more data
-      
-    	RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
-    	long len = reader.length();
-    	if (len > fileReadOffset) {
-    		reader.seek(fileReadOffset);
-    
-    		long bufSize = len - fileReadOffset;
-    		if (bufSize > MAX_READ_SIZE) {
-    			bufSize = MAX_READ_SIZE;
-    			hasMoreData = true;
-    		}
-    		byte[] buf = new byte[(int) bufSize];
-    		reader.read(buf);
-    		assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
-    				+ " pointer is "
-    				+ reader.getFilePointer()
-    				+ " but offset is " + fileReadOffset + bufSize;
-    
-    		int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf);
-    		fileReadOffset = fileReadOffset + bytesUsed;
-    	}
-    	reader.close();
-    } catch (IOException e) {
-    	log.warn("failure reading " + toWatch, e);
-    }
-    return hasMoreData;
+	    try {
+	        if(!toWatch.exists() && attempts>MAX_RETRIES) {
+	    	    log.warn("Adaptor|" + adaptorID +"| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired.  File removed from streaming.");
+       			ChukwaAgent agent = ChukwaAgent.getAgent();
+    			if (agent != null) {
+    				agent.stopAdaptor(adaptorID, false);
+    			} else {
+    				log.info("Agent is null, running in default mode");
+    			}
+    		    tailer.stopWatchingFile(this);
+	    	    return false;
+	        } else if(!toWatch.exists()) {
+	        	log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts+" of "+MAX_RETRIES);
+	        	attempts++;
+	            return false;  //no more data
+	        }
+	      	if (reader == null)
+	      	{
+	      		reader = new RandomAccessFile(toWatch, "r");
+	      		log.info("Adaptor|" + adaptorID + "|Opening the file for the first time|seek|" + fileReadOffset);
+	      	}
+	      	
+	      	long len = 0L;
+	    	try {
+		      	RandomAccessFile newReader = new RandomAccessFile(toWatch,"r");
+		    	len = reader.length();
+		    	long newLength = newReader.length();
+		      	if(newLength<len && fileReadOffset >= len) {
+		      		reader.close();
+		      		reader = newReader;
+		      		fileReadOffset=0L;
+		      		log.debug("Adaptor|" + adaptorID +"| File size mismatched, rotating: "+toWatch.getAbsolutePath());
+		      	} else {
+		      		try {
+		      		    newReader.close();
+		      		} catch(IOException e) {
+		      			// do nothing.
+		      		}
+		      	}
+	    	} catch(IOException e) {
+      			// do nothing, if file doesn't exist.	    		
+	    	}
+	    	if (len >= fileReadOffset) {
+	    		if(offsetOfFirstByte>fileReadOffset) {
+	    			// If the file rotated, the recorded offsetOfFirstByte is greater than file size,
+	    			// reset the first byte position to beginning of the file.
+	        		offsetOfFirstByte = 0L;    			
+	    			fileReadOffset=0;
+	    		}
+	    		
+	    		log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset );
+	    		reader.seek(fileReadOffset);
+	    
+	    		long bufSize = len - fileReadOffset;
+	    		
+	    		if (conf == null)
+	    		{
+	    			ChukwaAgent agent = ChukwaAgent.getAgent();
+	    			if (agent != null)
+	    			{
+	    				conf = agent.getConfiguration();
+	        			if (conf != null)
+	        			{
+	        				MAX_READ_SIZE= conf.getInt("chukwaAgent.fileTailingAdaptor.maxReadSize", DEFAULT_MAX_READ_SIZE);
+	        				log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
+	        			}	
+	        			else
+	        			{
+	        				log.info("Conf is null, running in default mode");
+	        			}
+	    			}
+	    			else
+	    			{
+	    				log.info("Agent is null, running in default mode");
+	    			}
+	    		}
+	    		
+	    		if (bufSize > MAX_READ_SIZE) {
+	    			bufSize = MAX_READ_SIZE;
+	    			hasMoreData = true;
+	    		}
+	    		byte[] buf = new byte[(int) bufSize];
+	    		
+	    		
+	    		long curOffset = fileReadOffset;
+	    		
+	    		reader.read(buf);
+	    		assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
+	    				+ " pointer is "
+	    				+ reader.getFilePointer()
+	    				+ " but offset is " + fileReadOffset + bufSize;
+	    
+	    		int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf);
+	    		fileReadOffset = fileReadOffset + bytesUsed;
+	    		
+	    		
+	    		log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"+ fileReadOffset);
+	    		
+	    		
+	    	} else {
+	    		// file has rotated and no detection
+	    		reader.close();
+	    		reader=null;
+	    		fileReadOffset = 0L;
+	    		offsetOfFirstByte = 0L;
+	    		hasMoreData = true;
+				log.warn("Adaptor|" + adaptorID +"| file has rotated and no detection - reset counters to 0L");	    	
+	    	}
+	    } catch (IOException e) {
+	    	log.warn("failure reading " + toWatch, e);
+	    }
+	    attempts=0;
+	    return hasMoreData;
 	}
 	
   /**

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,38 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.IOException;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor;
+import org.apache.hadoop.chukwa.inputtools.plugin.metrics.ExecHelper;
+import org.apache.log4j.Logger;
+
+public class TerminatorThread extends Thread {
+	private static Logger log =Logger.getLogger(FileAdaptor.class);
+	private static FileTailingAdaptor adaptor = null;
+	private static ChunkReceiver eq = null;
+	
+	public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
+		this.adaptor = adaptor;
+		this.eq = eq;
+	}
+
+	public void run() {
+   	    log.info("Terminator thread started.");
+  	    try {
+  	    	while(adaptor.tailFile(eq)) {
+  	    		log.info("");
+  	    	}
+		} catch (InterruptedException e) {
+			log.info("Unable to send data to collector for 60 seconds, force shutdown.");
+		}
+        log.info("Terminator finished.");
+        try {
+        	adaptor.reader.close();
+        } catch (IOException ex) {
+        	
+        }
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java Fri Dec  5 12:30:14 2008
@@ -19,6 +19,7 @@
 package org.apache.hadoop.chukwa.datacollection.agent;
 
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.log4j.Logger;
 
 /**
  * Produces new unconfigured adaptors, given the class name of the appender type
@@ -26,6 +27,7 @@
  */
 public class AdaptorFactory {
    
+  static Logger log = Logger.getLogger(ChukwaAgent.class);
     /**
      * Instantiate an adaptor that can be added by the {@link ChukwaAgent}
      * @param className the name of the {@link Adaptor} class to instantiate
@@ -39,13 +41,32 @@
       obj = Class.forName(className).newInstance();
       if (Adaptor.class.isInstance(obj)){
         return (Adaptor) obj;
+      } 
+      else        
+        return null;
+    } catch (Exception e1){
+      log.warn("Error instantiating new adaptor by class name, " +
+      		"attempting again, but with default chukwa package prepended, i.e. " +
+      		"org.apache.hadoop.chukwa.datacollection.adaptor." + className + ". " 
+      		+ e1);
+      try{
+        //if failed, try adding default class prefix
+        Object obj2 = Class.forName(
+            "org.apache.hadoop.chukwa.datacollection.adaptor." +
+            className).newInstance();
+        if (Adaptor.class.isInstance(obj2)){
+          log.debug("Succeeded in finding class by adding default chukwa " +
+              "namespace prefix to class name profided");
+          return (Adaptor) obj2;
+        } 
+        else        
+          return null;
+      } catch (Exception e2) {
+        System.out.println("Also error instantiating new adaptor by classname" +
+        		"with prefix added" + e2);
+        return null;
       }
-      else return null;
-    } catch (Exception e){
-      System.out.println("Error instantiating new adaptor by class" + e);
-      return null;
     }
-    
   }
   
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Fri Dec  5 12:30:14 2008
@@ -56,11 +56,14 @@
       InputStream in = connection.getInputStream();
       BufferedReader br = new BufferedReader(new InputStreamReader(in));
       PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream()));
+      //out.println("You are connected to the chukwa agent on "+ InetAddress.getLocalHost().getCanonicalHostName());
+      //out.flush();
       String cmd = null;
       while((cmd = br.readLine()) != null)  {
         processCommand(cmd, out);
       }
-      log.info("control connection closed");
+      if (log.isDebugEnabled())
+  		{ log.debug("control connection closed");}
       }
       catch(SocketException e ) {
         if(e.getMessage().equals("Socket Closed"))
@@ -78,7 +81,8 @@
      */
     public void processCommand(String cmd, PrintStream out) throws IOException  {
       String[] words = cmd.split(" ");
-      log.info("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);
+      if (log.isDebugEnabled())
+  		{ log.debug("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);}
       
       if(words[0].equalsIgnoreCase("help"))  {
         out.println("you're talking to the Chukwa agent.  Commands available: ");
@@ -88,6 +92,7 @@
         out.println("list -- list running adaptors");
         out.println("close -- close this connection");
         out.println("stopagent -- stop the whole agent process");
+        out.println("reloadCollectors -- reload the list of collectors");
         out.println("help -- print this message");
         out.println("\t Command names are case-blind.");
       }
@@ -123,9 +128,15 @@
           out.println("OK adaptor "+ num+ " stopped");
         }
       }
-      else if(words[0].equalsIgnoreCase("list") )  {
+      else if(words[0].equalsIgnoreCase("reloadCollectors"))  {
+            agent.getConnector().reloadConfiguration();
+            out.println("OK reloadCollectors done");
+        }else if(words[0].equalsIgnoreCase("list") )  {
         Map<Long, Adaptor> adaptorsByNumber = agent.getAdaptorList();
-        System.out.println("number of adaptors: " + adaptorsByNumber.size());
+        
+        if (log.isDebugEnabled())
+    		{ log.debug("number of adaptors: " + adaptorsByNumber.size());}
+        
         synchronized(adaptorsByNumber)   {
           for(Map.Entry<Long, Adaptor> a: adaptorsByNumber.entrySet())  {
             try{
@@ -168,7 +179,7 @@
   }
   
   public String formatAdaptorStatus(Adaptor a)  throws AdaptorException  {
-    return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() + " " + agent.getOffset(a);
+    return a.getClass().getCanonicalName() + " " + a.getCurrentStatus();
   }
 
   /**
@@ -186,7 +197,8 @@
     {
       try {
         Socket connection = s.accept();
-        log.info("new connection from " + connection.getInetAddress());
+        if (log.isDebugEnabled())
+        	{ log.debug("new connection from " + connection.getInetAddress());}
         ListenThread l = new ListenThread(connection);
         l.setDaemon(true);
         l.start();

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Fri Dec  5 12:30:14 2008
@@ -23,6 +23,7 @@
 import org.apache.hadoop.chukwa.datacollection.connector.*;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.chukwa.util.PidFile;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
@@ -32,257 +33,331 @@
 import java.io.*;
 
 /**
- * The local agent daemon that runs on each machine.
- * This class is designed to be embeddable, for use in testing.
+ * The local agent daemon that runs on each machine. This class is designed to
+ * be embeddable, for use in testing.
  * 
  */
 public class ChukwaAgent
 {
-  boolean DO_CHECKPOINT_RESTORE = false;
-  boolean WRITE_CHECKPOINTS = false;
- 
-  static String tags = "";
+  boolean DO_CHECKPOINT_RESTORE = true;
+  //boolean WRITE_CHECKPOINTS = true;
+
   static Logger log = Logger.getLogger(ChukwaAgent.class);
+  static ChukwaAgent agent = null;
+  private static PidFile pFile = null;
+
+  public static ChukwaAgent getAgent()
+  {
+    return agent;
+  }
+
+  Configuration conf = null;
+  Connector connector = null;
 
-  //doesn't need an equals(), comparator, etc
-  private static class Offset {
-    public Offset(long l, long id)  {
+  // doesn't need an equals(), comparator, etc
+  private static class Offset
+  {
+    public Offset(long l, long id)
+    {
       offset = l;
       this.id = id;
     }
+
     private volatile long id;
     private volatile long offset;
   }
 
-  public static class AlreadyRunningException extends Exception {
+  public static class AlreadyRunningException extends Exception
+  {
 
     private static final long serialVersionUID = 1L;
 
-    public AlreadyRunningException() {
+    public AlreadyRunningException()
+    {
       super("Agent already running; aborting");
     }
   }
-  
-  
+
   private final Map<Adaptor, Offset> adaptorPositions;
 
-  //basically only used by the control socket thread.
+  // basically only used by the control socket thread.
   private final Map<Long, Adaptor> adaptorsByNumber;
 
-  File checkpointDir;   //lock this object to indicate checkpoint in progress
-  File initialAdaptors;
-  String CHECKPOINT_BASE_NAME;  //base filename for checkpoint files
-  int CHECKPOINT_INTERVAL_MS ;  //min interval at which to write checkpoints
-
+  private File checkpointDir; // lock this object to indicate checkpoint in
+  // progress
+  private File initialAdaptors;
+  private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
+  private int CHECKPOINT_INTERVAL_MS; // min interval at which to write
+  // checkpoints
+  private static String tags = "";
 
   private Timer checkpointer;
-  private volatile boolean needNewCheckpoint = false; //set to true if any event has happened
-  //that should cause a new checkpoint to be written
-
-
-  private long lastAdaptorNumber= 0;   //ID number of the last adaptor to be started
-  private int checkpointNumber; //id number of next checkpoint.
-  //should be protected by grabbing lock on checkpointDir
-
+  private volatile boolean needNewCheckpoint = false; // set to true if any
+  // event has happened
+  // that should cause a new checkpoint to be written
+
+  private long lastAdaptorNumber = 0; // ID number of the last adaptor to be
+  // started
+  private int checkpointNumber; // id number of next checkpoint.
+  // should be protected by grabbing lock on checkpointDir
 
   private final AgentControlSocketListener controlSock;
 
   /**
    * @param args
-   * @throws AdaptorException 
+   * @throws AdaptorException
    */
-  public static void main(String[] args) throws AdaptorException {
+  public static void main(String[] args) throws AdaptorException
+  {
+
+    pFile = new PidFile("Agent");
+    Runtime.getRuntime().addShutdownHook(pFile);
 
-    try{
-      System.out.println("usage:  LocalAgent [-restore] [default collector URL]");
-      ChukwaAgent agent = new ChukwaAgent();
-      if(agent.anotherAgentIsRunning()) {
-        System.out.println("another agent is running (or port has been usurped).  Bailing out now");
-      }
-        
-      Connector connector;
-
-      int uriArgNumber= 0;
-      if(args.length > 0)  {
-        if(args[0].equals("-restore")) {
-          agent.DO_CHECKPOINT_RESTORE = true;
+    try
+    {
+      if (args.length > 0 && args[0].equals("-help")) {
+        System.out.println("usage:  LocalAgent [-noCheckPoint]" +
+            "[default collector URL]");
+        System.exit(0);
+      }
+      ChukwaAgent localAgent = new ChukwaAgent();
+
+      if (agent.anotherAgentIsRunning())
+      {
+        System.out
+            .println("another agent is running (or port has been usurped). " +
+            		"Bailing out now");
+        System.exit(-1);
+      }
+
+      int uriArgNumber = 0;
+      if (args.length > 0)
+      {
+        if (args[0].equalsIgnoreCase("-noCheckPoint"))
+        {
+          agent.DO_CHECKPOINT_RESTORE = false;
           uriArgNumber = 1;
         }
-        if(args[uriArgNumber].equals("local"))
-          connector = new ConsoleOutConnector(agent);
+        if (args[uriArgNumber].equals("local"))
+          agent.connector = new ConsoleOutConnector(agent);
         else
         {
-          if(!args[uriArgNumber].contains("://"))
+          if (!args[uriArgNumber].contains("://"))
             args[uriArgNumber] = "http://" + args[uriArgNumber];
-          connector = new HttpConnector(agent, args[uriArgNumber]);
+          agent.connector = new HttpConnector(agent, args[uriArgNumber]);
         }
-      }
-      else
-        connector = new HttpConnector(agent);
+      } else
+        agent.connector = new HttpConnector(agent);
 
-      connector.start();
+      agent.connector.start();
 
       log.info("local agent started on port " + agent.getControlSock().portno);
 
-    }	catch(AlreadyRunningException e){
-      log.error("agent started already on this machine with same portno ; bailing out");
-      System.out.println("agent started already on this machine with same portno ; bailing out");
-      System.exit(0); //better safe than sorry
-    } catch(Exception e) 	{
+    } catch (AlreadyRunningException e)
+    {
+      log
+          .error("agent started already on this machine with same portno;" +
+          		" bailing out");
+      System.out
+          .println("agent started already on this machine with same portno;" +
+          		" bailing out");
+      System.exit(0); // better safe than sorry
+    } catch (Exception e)
+    {
       e.printStackTrace();
     }
   }
-  private boolean anotherAgentIsRunning() {
+
+  private boolean anotherAgentIsRunning()
+  {
     return !controlSock.isBound();
   }
+
   /**
    * @return the number of running adaptors inside this local agent
    */
-  public int adaptorCount() {
-    return adaptorPositions.size();
+  public int adaptorCount()
+  {
+    return adaptorsByNumber.size();
   }
 
   public ChukwaAgent() throws AlreadyRunningException
   {
+    ChukwaAgent.agent = this;
+
     readConfig();
 
-    //almost always just reading this; so use a ConcurrentHM.
-    //since we wrapped the offset, it's not a structural mod.
-    adaptorPositions= new ConcurrentHashMap<Adaptor, Offset>();
+    // almost always just reading this; so use a ConcurrentHM.
+    // since we wrapped the offset, it's not a structural mod.
+    adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
     adaptorsByNumber = new HashMap<Long, Adaptor>();
-    checkpointNumber=0;
-    try{
-      if(DO_CHECKPOINT_RESTORE)
+    checkpointNumber = 0;
+    try
+    {
+      if (DO_CHECKPOINT_RESTORE)
         restoreFromCheckpoint();
-    } catch(IOException e)  {
+    } catch (IOException e)
+    {
       log.warn("failed to restart from checkpoint: ", e);
     }
-    
-    try {
-      if(initialAdaptors != null && initialAdaptors.exists())
+
+    try
+    {
+      if (initialAdaptors != null && initialAdaptors.exists())
         readAdaptorsFile(initialAdaptors);
-    } catch(IOException e) {
-      log.warn("couldn't read user-specified file "+ initialAdaptors.getAbsolutePath());
+    } catch (IOException e)
+    {
+      log.warn("couldn't read user-specified file "
+          + initialAdaptors.getAbsolutePath());
     }
-    
+
     controlSock = new AgentControlSocketListener(this);
-    try {
-      controlSock.tryToBind(); //do this synchronously; if it fails, we know another agent is running.
-      controlSock.start();  //this sets us up as a daemon
+    try
+    {
+      controlSock.tryToBind(); // do this synchronously; if it fails, we know
+      // another agent is running.
+      controlSock.start(); // this sets us up as a daemon
       log.info("control socket started on port " + controlSock.portno);
-      
-      if(CHECKPOINT_INTERVAL_MS > 0)  {
+
+      if (CHECKPOINT_INTERVAL_MS > 0)
+      {
         checkpointer = new Timer();
         checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
       }
-    } catch(IOException e) {
+    } catch (IOException e)
+    {
       log.info("failed to bind to socket; aborting agent launch", e);
       throw new AlreadyRunningException();
     }
 
-  
   }
 
-  //FIXME: should handle bad lines here
+  // FIXME: should handle bad lines here
   public long processCommand(String cmd)
   {
     String[] words = cmd.split(" ");
-    if(words[0].equalsIgnoreCase("add"))
+    if (words[0].equalsIgnoreCase("add"))
     {
-      //words should contain (space delimited):
-      //  0) command ("add")
-      //  1) AdaptorClassname
-      //  2) dataType (e.g. "hadoop_log")
-      //  3) params <optional> 
-      //           (e.g. for files, this is filename,
-      //            but can be arbitrarily many space
-      //            delimited agent specific params )
-      //  4) offset
+      // words should contain (space delimited):
+      // 0) command ("add")
+      // 1) AdaptorClassname
+      // 2) dataType (e.g. "hadoop_log")
+      // 3) params <optional>
+      // (e.g. for files, this is filename,
+      // but can be arbitrarily many space
+      // delimited agent specific params )
+      // 4) offset
 
       long offset;
-      try  {
-        offset = Long.parseLong(words[words.length-1]);
-      } catch(NumberFormatException e) {
+      try
+      {
+        offset = Long.parseLong(words[words.length - 1]);
+      } catch (NumberFormatException e)
+      {
         log.warn("malformed line " + cmd);
         return -1L;
       }
       String adaptorName = words[1];
 
       Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
-      if(adaptor == null) {
-        log.warn("don't recognize adaptor name " + adaptorName);
+      if (adaptor == null)
+      {
+        log.warn("Error creating adaptor from adaptor name " + adaptorName);
         return -1L;
       }
-      
 
       String dataType = words[2];
-      
+      String streamName = "";
       String params = "";
-      if(words.length > 4){ //no argument
-        int begParams = adaptorName.length()+dataType.length()+6;//length("ADD x type ") = length(x) + 5, i.e. count letters & spaces
-        params = cmd.substring(begParams, cmd.length() - words[words.length-1].length() -1);
+      if (words.length > 4)
+      { // no argument
+        int begParams = adaptorName.length() + dataType.length() + 6;
+        // length("ADD x type ") = length(x) + 5, i.e. count letters & spaces
+        params = cmd.substring(begParams, cmd.length()
+            - words[words.length - 1].length() - 1);
+        streamName = params.substring(params.indexOf(" ") + 1, params.length());
       }
       long adaptorID;
-      synchronized(adaptorsByNumber) {
-        adaptorID  = ++lastAdaptorNumber;
+      synchronized (adaptorsByNumber)
+      {
+        for (Map.Entry<Long, Adaptor> a : adaptorsByNumber.entrySet())
+        {
+          if (streamName.intern() == a.getValue().getStreamName().intern())
+          {
+            log.warn(params + " already exist, skipping.");
+            return -1;
+          }
+        }
+        adaptorID = ++lastAdaptorNumber;
         adaptorsByNumber.put(adaptorID, adaptor);
-        adaptorPositions.put(adaptor, new Offset(offset,adaptorID));
-      }
-      
-      try {
-        adaptor.start(dataType, params, offset, DataFactory.getInstance().getEventQueue());
-        log.info("started a new adaptor, id = " +adaptorID);
-        return adaptorID ;
-        
-      } catch(AdaptorException e) {
-        log.warn("failed to start adaptor", e);
-        //FIXME: don't we need to clean up the adaptor maps here?
+        adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
+        try
+        {
+          adaptor.start(adaptorID, dataType, params, offset, DataFactory
+              .getInstance().getEventQueue());
+          log.info("started a new adaptor, id = " + adaptorID);
+          return adaptorID;
+
+        } catch (Exception e)
+        {
+          log.warn("failed to start adaptor", e);
+          // FIXME: don't we need to clean up the adaptor maps here?
+        }
       }
-    }
-    else
+    } else
       log.warn("only 'add' command supported in config files");
 
     return -1;
   }
 
   /**
-   *  Tries to restore from a checkpoint file in checkpointDir.
-   *  There should usually only be one checkpoint present --
-   *  two checkpoints present implies a crash during
-   *  writing the higher-numbered one.
-   *  As a result, this method chooses the lowest-numbered file present.
-   *  
-   *  Lines in the checkpoint file are processed one at a time with processCommand();
-   *   
+   * Tries to restore from a checkpoint file in checkpointDir. There should
+   * usually only be one checkpoint present -- two checkpoints present implies a
+   * crash during writing the higher-numbered one. As a result, this method
+   * chooses the lowest-numbered file present.
+   * 
+   * Lines in the checkpoint file are processed one at a time with
+   * processCommand();
+   * 
    * @return true if the restore succeeded
    * @throws IOException
-   */ 
+   */
   public boolean restoreFromCheckpoint() throws IOException
   {
-    synchronized(checkpointDir)
+    synchronized (checkpointDir)
     {
-      String[] checkpointNames =  checkpointDir.list(new FilenameFilter()
+      String[] checkpointNames = checkpointDir.list(new FilenameFilter()
       {
-        public boolean accept(File dir, String name)  {
+        public boolean accept(File dir, String name)
+        {
           return name.startsWith(CHECKPOINT_BASE_NAME);
-        } 
+        }
       });
-      if(checkpointNames.length == 0)
+      
+      if (checkpointNames == null) {
+        log.error("Unable to list directories in checkpoint dir");
+        return false;
+      }
+      if (checkpointNames.length == 0)
       {
-        log.info("No checkpoints found in "+ checkpointDir);
+        log.info("No checkpoints found in " + checkpointDir);
         return false;
       }
 
-      if(checkpointNames.length > 2)
-        log.warn("expected at most two checkpoint files in " + checkpointDir +  "; saw " + checkpointNames.length);
-      else if(checkpointNames.length == 0)
+      if (checkpointNames.length > 2)
+        log.warn("expected at most two checkpoint files in " + checkpointDir
+            + "; saw " + checkpointNames.length);
+      else if (checkpointNames.length == 0)
         return false;
 
-      String lowestName=null;
-      int lowestIndex=Integer.MAX_VALUE;
-      for(String n: checkpointNames) {
-        int index = Integer.parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
-        if(index < lowestIndex)  {
+      String lowestName = null;
+      int lowestIndex = Integer.MAX_VALUE;
+      for (String n : checkpointNames)
+      {
+        int index = Integer
+            .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
+        if (index < lowestIndex)
+        {
           lowestName = n;
           lowestIndex = index;
         }
@@ -294,203 +369,244 @@
     }
     return true;
   }
+
   private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
       IOException
   {
-    BufferedReader br = new BufferedReader( new InputStreamReader(new FileInputStream(checkpoint)));
-    String cmd=null;
-    while((cmd = br.readLine()) != null)
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new FileInputStream(checkpoint)));
+    String cmd = null;
+    while ((cmd = br.readLine()) != null)
       processCommand(cmd);
     br.close();
   }
 
   /**
    * Called periodically to write checkpoints
+   * 
    * @throws IOException
    */
-  public void writeCheckpoint() throws IOException
-  { 
+  public void writeCheckpoint() throws IOException {
     needNewCheckpoint = false;
-    synchronized(checkpointDir) {
+    synchronized (checkpointDir) {
       log.info("writing checkpoint " + checkpointNumber);
 
-      FileOutputStream fos = new FileOutputStream(
-          new File(checkpointDir, CHECKPOINT_BASE_NAME + checkpointNumber));
-      PrintWriter out = new PrintWriter( new BufferedWriter(
+      FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
+          CHECKPOINT_BASE_NAME + checkpointNumber));
+      PrintWriter out = new PrintWriter(new BufferedWriter(
           new OutputStreamWriter(fos)));
 
-      for(Map.Entry<Adaptor, Offset> stat: adaptorPositions.entrySet()) {
-        try{
+      for (Map.Entry<Adaptor, Offset> stat : adaptorPositions.entrySet()) {
+        try {
           Adaptor a = stat.getKey();
           out.print("ADD " + a.getClass().getCanonicalName());
-          out.print(" ");
-          out.print(a.getType());
-          out.print(" " + a.getCurrentStatus() + " ");
-          out.println(stat.getValue().offset);
-        }  catch(AdaptorException e)  {
+          out.println(" " + a.getCurrentStatus());
+        } catch (AdaptorException e) {
           e.printStackTrace();
-        }//don't try to recover from bad adaptor yet
+        }// don't try to recover from bad adaptor yet
       }
 
       out.close();
-      File lastCheckpoint =  new File(checkpointDir, CHECKPOINT_BASE_NAME + (checkpointNumber-1));
-      log.debug("hopefully removing old checkpoint file " + lastCheckpoint.getAbsolutePath());
+      File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME
+          + (checkpointNumber - 1));
+      log.debug("hopefully removing old checkpoint file "
+          + lastCheckpoint.getAbsolutePath());
       lastCheckpoint.delete();
       checkpointNumber++;
     }
   }
 
-  public void reportCommit(Adaptor src, long uuid)
-  {
+  public void reportCommit(Adaptor src, long uuid) {
     needNewCheckpoint = true;
     Offset o = adaptorPositions.get(src);
-    if(o != null) {
-      synchronized(o) { //order writes to offset, in case commits are processed out of order
-        if( uuid > o.offset)
+    if (o != null)
+    {
+      synchronized (o)
+      { // order writes to offset, in case commits are processed out of order
+        if (uuid > o.offset)
           o.offset = uuid;
       }
-      
-      log.info("got commit up to " + uuid + " on " + src+ " = "+ o.id);
-    }
-    else {
-      log.warn("got commit up to " + uuid +  "  for adaptor " +src + 
-          " that doesn't appear to be running: " + adaptorsByNumber.size() + " total");
+
+      log.info("got commit up to " + uuid + " on " + src + " = " + o.id);
+    } else
+    {
+      log.warn("got commit up to " + uuid + "  for adaptor " + src
+          + " that doesn't appear to be running: " + adaptorsByNumber.size()
+          + " total");
     }
   }
 
-  class CheckpointTask extends TimerTask  {
-    public void run()  {
-      try{
-        if(needNewCheckpoint ) {
+  class CheckpointTask extends TimerTask {
+    public void run()
+    {
+      try
+      {
+        if (needNewCheckpoint)
+        {
           writeCheckpoint();
         }
-      } catch(IOException e)  {
+      } catch (IOException e)
+      {
         log.warn("failed to write checkpoint", e);
       }
     }
   }
-  
-//for use only by control socket.
-  Map<Long, Adaptor> getAdaptorList()  {
-    return adaptorsByNumber; 
+
+  // for use only by control socket.
+  public Map<Long, Adaptor> getAdaptorList() {
+    return adaptorsByNumber;
   }
+
   /**
-   * Stop the adaptor with given ID number.
-   * Takes a parameter to indicate whether the adaptor should
-   * force out all remaining data, or just exit abruptly.
+   * Stop the adaptor with given ID number. Takes a parameter to indicate
+   * whether the adaptor should force out all remaining data, or just exit
+   * abruptly.
    * 
-   * If the adaptor is written correctly, its offset won't change after returning
-   * from shutdown.
+   * If the adaptor is written correctly, its offset won't change after
+   * returning from shutdown.
    * 
-   * @param number the adaptor to stop
-   * @param gracefully if true, shutdown, if false, hardStop
+   * @param number
+   *          the adaptor to stop
+   * @param gracefully
+   *          if true, shutdown, if false, hardStop
    * @return the number of bytes synched at stop. -1 on error
    */
-  public long stopAdaptor(long number, boolean gracefully)  {
+  public long stopAdaptor(long number, boolean gracefully) {
     Adaptor toStop;
     long offset = -1;
-    
-      //at most one thread can get past this critical section with toStop != null
-      //so if multiple callers try to stop the same adaptor, all but one will fail
-    synchronized(adaptorsByNumber) {
+
+    // at most one thread can get past this critical section with toStop != null
+    // so if multiple callers try to stop the same adaptor, all but one will
+    // fail
+    synchronized (adaptorsByNumber) {
       toStop = adaptorsByNumber.remove(number);
     }
-    if(toStop == null) {
+    if (toStop == null) {
       log.warn("trying to stop adaptor " + number + " that isn't running");
       return offset;
     }
-    try {
-      if(gracefully ) {
-
-        long bytesSentByAdaptor = toStop.shutdown(); //this can block
-        long unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset;
-        while(unstableBytes > 0 ) {
-          log.info("waiting for adaptor " + number +  "  to terminate " +
-              unstableBytes + " bytes are still uncommitted");
-          Thread.sleep(2000);
-          unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset;
-        }
+    
+    try {    	      
+      if (gracefully) {
+   	    offset = toStop.shutdown(); 
       }
-      else
-        toStop.hardStop();
-      Offset off = adaptorPositions.remove(toStop);  //next checkpoint will have the remove
-      offset = off == null ? -1 : off.offset;
-      needNewCheckpoint = true;
-
-    } catch(AdaptorException e) {
+    } catch (AdaptorException e) {
       log.error("adaptor failed to stop cleanly", e);
-    } catch(InterruptedException e) {
-      log.error("can't wait for adaptor to finish writing", e);
+    } finally {
+    	  needNewCheckpoint = true;
     }
     return offset;
   }
-  
+
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  public Connector getConnector() {
+    return connector;
+  }
+
   protected void readConfig() {
-    Configuration conf = new Configuration();
+    conf = new Configuration();
+
     String chukwaHome = System.getenv("CHUKWA_HOME");
-    if (chukwaHome == null){
+    if (chukwaHome == null) {
       chukwaHome = ".";
     }
-    if(!chukwaHome.endsWith("/"))
-      chukwaHome = chukwaHome + "/";
-    
-    conf.addResource(new Path("conf/chukwa-agent-conf.xml"));
-    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name", "chukwa_checkpoint_");
-    checkpointDir= new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome+ "/var/"));
-    CHECKPOINT_INTERVAL_MS= conf.getInt("chukwaAgent.checkpoint.interval", 5000);
-    DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled", false);
-    if(DO_CHECKPOINT_RESTORE) {
-      WRITE_CHECKPOINTS = true;
-      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+
+    if (!chukwaHome.endsWith("/")) {
+      chukwaHome = chukwaHome + File.separator;
+    }
+    log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
+
+    String chukwaConf = System.getProperty("CHUKWA_CONF_DIR");
+    if (chukwaConf == null) {
+      chukwaConf = chukwaHome + "conf" + File.separator;
+    }
+    if (!chukwaHome.endsWith("/")) {
+      chukwaHome = chukwaHome + File.separator;
+    }
+    if (!chukwaConf.endsWith("/")) {
+        chukwaConf = chukwaConf + File.separator;    	
+    }
+    log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
+
+    conf.addResource(new Path(chukwaConf + "chukwa-agent-conf.xml"));
+    DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
+        true);
+    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
+        "chukwa_checkpoint_");
+    checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome
+        + "/var/"));
+    CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
+        5000);
+    if (!checkpointDir.exists())
+    {
+      checkpointDir.mkdirs();
     }
-  //  String initialAdaptorsStr = conf.get("initial_adaptors_file");
-    
     tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
-    
-    initialAdaptors = new File(chukwaHome + "conf/initial_adaptors");
+
+    log.info("Config - chukwaHome: [" + chukwaHome + "]");
+    log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
+    log.info("Config - checkpointDir: [" + checkpointDir + "]");
+    log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
+        + "]");
+    log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
+    log.info("Config - tags: [" + tags + "]");
+
+    if (DO_CHECKPOINT_RESTORE) {
+      needNewCheckpoint = true;
+      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+    }
+
+    initialAdaptors = new File(chukwaConf + "initial_adaptors");
   }
-  
+
   public void shutdown() {
     shutdown(false);
   }
 
   /**
-   * Triggers agent shutdown.
-   * For now, this method doesn't shut down adaptors explicitly.  It probably should.
+   * Triggers agent shutdown. For now, this method doesn't shut down adaptors
+   * explicitly. It probably should.
    */
   public void shutdown(boolean exit) {
-    if(checkpointer != null)  
+    if (checkpointer != null)
       checkpointer.cancel();
-    
-    controlSock.shutdown(); //make sure we don't get new requests
-
+    controlSock.shutdown(); // make sure we don't get new requests
     try {
-      if(WRITE_CHECKPOINTS)
-        writeCheckpoint(); //write a last checkpoint here, before stopping adaptors
-    } catch(IOException e) { 
+      if (needNewCheckpoint)
+        writeCheckpoint(); // write a last checkpoint here, before stopping
+      // adaptors
+    } catch (IOException e) {
     }
-    
-    synchronized(adaptorsByNumber) {   //shut down each adaptor
-      for(Adaptor a: adaptorsByNumber.values()) {
-        try{
+
+    synchronized (adaptorsByNumber) { 
+      // shut down each adaptor
+      for (Adaptor a : adaptorsByNumber.values()) {
+        try {
           a.hardStop();
-        }catch(AdaptorException e) {
-          log.warn("failed to cleanly stop " + a,e);
+        } catch (AdaptorException e)
+        {
+          log.warn("failed to cleanly stop " + a, e);
         }
       }
     }
-    
-    if(exit)
+    if (exit)
       System.exit(0);
   }
-/**
- *   Returns the last offset at which a given adaptor was checkpointed
- * @param a the adaptor in question
- * @return that adaptor's last-checkpointed offset
- */
+
+  /**
+   * Returns the last offset at which a given adaptor was checkpointed
+   * 
+   * @param a
+   *          the adaptor in question
+   * @return that adaptor's last-checkpointed offset
+   */
   public long getOffset(Adaptor a) {
     return adaptorPositions.get(a).offset;
   }
+
   /**
    * Returns the control socket for this agent.
    */
@@ -499,7 +615,7 @@
   }
 
   public static String getTags() {
-	    return tags;
+    return tags;
   }
 
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Fri Dec  5 12:30:14 2008
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.chukwa.datacollection.agent;
 
-import java.util.*;
-//import java.util.concurrent.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
 
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
@@ -33,13 +34,14 @@
  */
 public class MemLimitQueue implements ChunkQueue
 {
-
 	static Logger log = Logger.getLogger(WaitingQueue.class);
 	
 	private Queue<Chunk> queue = new LinkedList<Chunk>();
 	private long dataSize = 0;
 	private final long MAX_MEM_USAGE;
 
+
+	
   public MemLimitQueue(int limit) {
     MAX_MEM_USAGE = limit;
   }
@@ -47,15 +49,21 @@
 	/**
 	 * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
 	 */
-	public void add(Chunk event) throws InterruptedException
+	public void add(Chunk chunk) throws InterruptedException
 	{
-	  assert event != null: "can't enqueue null chunks";
+	  assert chunk != null: "can't enqueue null chunks";
     synchronized(this) {
-      while(event.getData().length  + dataSize > MAX_MEM_USAGE)
-        this.wait();
-      
-      dataSize += event.getData().length;
-      queue.add(event);
+      while(chunk.getData().length  + dataSize > MAX_MEM_USAGE)
+      {
+    	  try 
+    	  { 
+    		  this.wait();
+    		  log.info("MemLimitQueue is full [" + dataSize +"]");
+    	  }
+    	  catch(InterruptedException e) {}
+      }
+      dataSize += chunk.getData().length;
+      queue.add(chunk);
       this.notifyAll();
     }
 	 
@@ -64,7 +72,7 @@
 	/**
 	 * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List, int)
 	 */
-	public void collect(List<Chunk> events,int maxCount) throws InterruptedException
+	public void collect(List<Chunk> events,int maxSize) throws InterruptedException
 	{
 		synchronized(this) {
 		  //we can't just say queue.take() here, since we're holding a lock.
@@ -72,10 +80,13 @@
 		    this.wait();
 		  }
 		  
-		  int i = 0;
-		  while(!queue.isEmpty() && (i++ < maxCount)) { 
+		  
+		  int size = 0;
+		  while(!queue.isEmpty() && (size < maxSize)) { 
 		    Chunk e = this.queue.remove();
-		    dataSize -= e.getData().length;
+		    int chunkSize = e.getData().length;
+		    size += chunkSize;
+		    dataSize -= chunkSize;
 		    events.add(e);
 		  }
 		  this.notifyAll();

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Fri Dec  5 12:30:14 2008
@@ -22,53 +22,77 @@
 import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.jetty.servlet.*;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
-import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
+import org.apache.hadoop.chukwa.util.PidFile;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 
 public class CollectorStub {
   
-  
-  public static void main(String[] args)
-  {
-    
+  static int THREADS = 80;
+  private static PidFile pFile = null;
+  public static Server jettyServer = null;
+  public static void main(String[] args) {
+	
+	pFile=new PidFile("Collector");
+	Runtime.getRuntime().addShutdownHook(pFile); 	 	  
     try {
       System.out.println("usage:  CollectorStub [portno] [pretend]");
-      System.out.println("note: if no portno defined, defaults to value in chukwa-site.xml");
+      System.out.println("note: if no portno defined, " +
+      		"defaults to value in chukwa-site.xml");
  
       ChukwaConfiguration conf = new ChukwaConfiguration();
       int portNum = conf.getInt("chukwaCollector.http.port", 9999);
-
+      THREADS = conf.getInt("chukwaCollector.http.threads", 80);
+      
       if(args.length != 0)
         portNum = Integer.parseInt(args[0]);
+      
+        //pick a writer.
       if(args.length > 1) {
         if(args[1].equals("pretend"))
           ServletCollector.setWriter(new ConsoleWriter(true));
         else if(args[1].equals("pretend-quietly"))
           ServletCollector.setWriter(new ConsoleWriter(false));
+        else if(args[1].equals("-classname")) {
+          if(args.length < 3)
+            System.err.println("need to specify a writer class");
+          else {
+            Class<?> writerClass = Class.forName(args[2]);
+            if(writerClass != null &&
+                ChukwaWriter.class.isAssignableFrom(writerClass))
+              ServletCollector.setWriter(
+                  (ChukwaWriter) writerClass.newInstance());
+            else
+              System.err.println(args[2]+ "is not a ChukwaWriter");
+          }
+        }
         else
-          System.out.println("WARNING: don't know what to do with command line arg "+ args[1]);
+          System.out.println("WARNING: unknown command line arg "+ args[1]);
       }
       
+        //set up jetty connector
       SelectChannelConnector jettyConnector = new SelectChannelConnector();
-      jettyConnector.setLowResourcesConnections(20);
-      jettyConnector.setLowResourceMaxIdleTime(1000);
+      jettyConnector.setLowResourcesConnections(THREADS-10);
+      jettyConnector.setLowResourceMaxIdleTime(1500);
       jettyConnector.setPort(portNum);
-      Server server = new Server(portNum);
-      server.setConnectors(new Connector[]{ jettyConnector});
-      org.mortbay.thread.BoundedThreadPool pool = new  org.mortbay.thread.BoundedThreadPool();
-      pool.setMaxThreads(30);
-      server.setThreadPool(pool);
-      Context root = new Context(server,"/",Context.SESSIONS);
+        //set up jetty server
+      jettyServer = new Server(portNum);
+      
+      jettyServer.setConnectors(new Connector[]{ jettyConnector});
+      org.mortbay.thread.BoundedThreadPool pool = 
+        new org.mortbay.thread.BoundedThreadPool();
+      pool.setMaxThreads(THREADS);
+      jettyServer.setThreadPool(pool);
+        //and add the servlet to it
+      Context root = new Context(jettyServer,"/",Context.SESSIONS);
       root.addServlet(new ServletHolder(new ServletCollector()), "/*");
-      server.start();
-      server.setStopAtShutdown(false);
+      jettyServer.start();
+      jettyServer.setStopAtShutdown(false);
      
       System.out.println("started http collector on port number " + portNum);
 
-    }
-    catch(Exception e)
-    {
-      e.printStackTrace();
+    } catch(Exception e) {
+     e.printStackTrace();
       System.exit(0);
     }
 

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Fri Dec  5 12:30:14 2008
@@ -18,7 +18,13 @@
 
 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
 
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -27,26 +33,30 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
 import org.apache.log4j.Logger;
 
 public class ServletCollector extends HttpServlet
 {
 
-  static final boolean FANCY_DIAGNOSTICS = true;
+  static final boolean FANCY_DIAGNOSTICS = false;
 	static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
 	 
   private static final long serialVersionUID = 6286162898591407111L;
   Logger log = Logger.getRootLogger();//.getLogger(ServletCollector.class);
-	  
   
-	public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws IOException
+	public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws WriterException
 	{
 	  writer = w;
 	  w.init();
 	}
-  
+	static long statTime = 0L;
+	static int numberHTTPConnection = 0;
+	static int numberchunks = 0;
+	
 	public void init(ServletConfig servletConf) throws ServletException
 	{
 	  
@@ -56,6 +66,20 @@
 			return;
 		}
 		
+		
+		Timer statTimer = new Timer();
+		statTimer.schedule(new TimerTask()
+		{
+			public void run() 
+			{
+				log.info("stats:ServletCollector,numberHTTPConnection:" + numberHTTPConnection
+						 + ",numberchunks:"+numberchunks);
+				statTime = System.currentTimeMillis();
+				numberHTTPConnection = 0;
+				numberchunks = 0;
+			}
+		}, (1000), (60*1000));
+		
 		try
 		{
 			// read the application->pipeline settings from a config file in the format:
@@ -81,56 +105,75 @@
 			if (writer == null)
 				writer =  new SeqFileWriter();
 
-		} catch (IOException e) {
+		} catch (WriterException e) {
 			throw new ServletException("Problem init-ing servlet", e);
 		}		
 	}
 
 	protected void accept(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException
+	throws ServletException
 	{
-	  ServletDiagnostics diagnosticPage = new ServletDiagnostics();
+		numberHTTPConnection ++;
+		ServletDiagnostics diagnosticPage = new ServletDiagnostics();
+		final long currentTime = System.currentTimeMillis();
 		try {
-	    
-		  final long currentTime = System.currentTimeMillis();
-		  log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
+
+			log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
 			java.io.InputStream in = req.getInputStream();
-						
+
 			ServletOutputStream l_out = resp.getOutputStream();
 			final DataInputStream di = new DataInputStream(in);
 			final int numEvents = di.readInt();
-		  //	log.info("saw " + numEvents+ " in request");
+			//	log.info("saw " + numEvents+ " in request");
+
+			if(FANCY_DIAGNOSTICS)
+			{ diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime); }
 
-      if(FANCY_DIAGNOSTICS)
-        diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
-			for (int i = 0; i < numEvents; i++){
-				// TODO: pass new data to all registered stream handler methods for this chunk's stream
+			List<Chunk> events = new LinkedList<Chunk>();
+			ChunkImpl logEvent = null;
+			StringBuilder sb = new StringBuilder();
+
+			for (int i = 0; i < numEvents; i++)
+			{
+				// TODO: pass new data to all registered stream handler 
+			  //       methods for this chunk's stream
 				// TODO: should really have some dynamic assignment of events to writers
 
-	      ChunkImpl logEvent =  ChunkImpl.read(di);
+				logEvent =  ChunkImpl.read(di);
+				sb.append("ok:");
+				sb.append(logEvent.getData().length);
+				sb.append(" bytes ending at offset ");
+				sb.append(logEvent.getSeqID()-1).append("\n");
 
-	      if(FANCY_DIAGNOSTICS)
-	        diagnosticPage.sawChunk(logEvent, i);
-	      
-				// write new data to data sync file
-				if(writer != null) {
-				  writer.add(logEvent);  //save() blocks until data is written
-				  //this is where we ACK this connection
-					l_out.print("ok:");
-					l_out.print(logEvent.getData().length);
-					l_out.print(" bytes ending at offset ");
-					l_out.println(logEvent.getSeqID()-1);
-				}
-				else
-					l_out.println("can't write: no writer");	
+				events.add(logEvent);
+
+				if(FANCY_DIAGNOSTICS)
+				{ diagnosticPage.sawChunk(logEvent, i); }
 			}
 
-      if(FANCY_DIAGNOSTICS)
-        diagnosticPage.doneWithPost();
-	    resp.setStatus(200);
-			
-		} catch (IOException e) 	{
-			log.warn("IO error", e);
+			// write new data to data sync file
+			if(writer != null) 
+			{
+				writer.add(events);
+				numberchunks += events.size();
+				//this is where we ACK this connection
+				l_out.print(sb.toString());
+			}
+			else
+			{
+				l_out.println("can't write: no writer");
+			}
+
+
+			if(FANCY_DIAGNOSTICS)
+			{ diagnosticPage.doneWithPost(); }
+
+			resp.setStatus(200);
+
+		} 
+		catch(Throwable e) 
+		{
+			log.warn("Exception talking to " +req.getRemoteHost() + " at " + currentTime , e);
 			throw new ServletException(e);
 		}
 	}
@@ -147,16 +190,30 @@
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
 			throws ServletException, IOException
 	{
-	  PrintStream out = new PrintStream(resp.getOutputStream());
-    resp.setStatus(200);
-	  out.println("<html><body><h2>Chukwa servlet running</h2>");
-	  if(FANCY_DIAGNOSTICS)
-	    ServletDiagnostics.printPage(out);
-	  out.println("</body></html>");
-//		accept(req,resp);
+	
+		PrintStream out = new PrintStream(resp.getOutputStream());
+		resp.setStatus(200);
+		
+	  String pingAtt = req.getParameter("ping");
+	  if (pingAtt!=null)
+	  {
+		  out.println("Date:" + ServletCollector.statTime);
+		  out.println("Now:" + System.currentTimeMillis());
+		  out.println("numberHTTPConnection:" + ServletCollector.numberHTTPConnection);
+		  out.println("numberchunks:" + ServletCollector.numberchunks);
+	  }
+	  else
+	  {
+		  out.println("<html><body><h2>Chukwa servlet running</h2>");
+		  if(FANCY_DIAGNOSTICS)
+		    ServletDiagnostics.printPage(out);
+		  out.println("</body></html>");
+	  }
+    
+	  
 	}
 
-  @Override	
+    @Override	
 	public String getServletInfo()
 	{
 		return "Chukwa Servlet Collector";
@@ -165,10 +222,14 @@
 	@Override
 	public void destroy()
 	{
-	  synchronized(writer)
-	  {
-	    writer.close();
-	  }
+	  try
+	{
+		writer.close();
+	} catch (WriterException e)
+	{
+		log.warn("Exception during close", e);
+		e.printStackTrace();
+	}
 	  super.destroy();
 	}
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Fri Dec  5 12:30:14 2008
@@ -31,6 +31,10 @@
 
   static Logger log=  Logger.getLogger(ServletDiagnostics.class);
   
+
+  static int CHUNKS_TO_KEEP = 50;
+  static int CHUNKS_TO_DISPLAY = 50;
+  
   private static class PostStats { //statistics about a chunk
     public PostStats(String src, int count, long receivedTs)
     {
@@ -75,7 +79,6 @@
 
   static LinkedList<PostStats> lastPosts;
   PostStats curPost;
-  static int CHUNKS_TO_KEEP = 300;
 
   
   public void sawPost(String source, int chunks, long receivedTs) {
@@ -96,28 +99,33 @@
     long timeWindowOfSample = Long.MAX_VALUE;
     long now = System.currentTimeMillis();
 
-
     out.println("<ul>");
     
     synchronized(lastPosts) {
+      int toSkip = lastPosts.size() - CHUNKS_TO_DISPLAY; 
+      
       if(!lastPosts.isEmpty())
         timeWindowOfSample = now -  lastPosts.peek().receivedTs;
       
       for(PostStats stats: lastPosts) {
-        out.print("<li>");
-        
-        out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs);
-        out.println(" which was " +  ((now - stats.receivedTs)/ 1000) + " seconds ago");
         Long oldBytes = bytesFromHost.get(stats.src);
         long newBytes = stats.dataSize;
         if(oldBytes != null)
           newBytes += oldBytes;
         bytesFromHost.put(stats.src, newBytes);
-        out.println("<ol>");
-        for(int i =0; i < stats.count; ++i)
-          out.println("<li> "+ stats.lengths[i] + " bytes of type " +
-              stats.types[i] + ".  Adaptor name ="+ stats.names[i] +" </li>");
-        out.println("</ol></li>");
+        
+        if( -- toSkip < 0) { //done skipping
+          out.print("<li>");
+          
+          out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs);
+          out.println(" which was " +  ((now - stats.receivedTs)/ 1000) + " seconds ago");
+  
+          out.println("<ol>");
+          for(int i =0; i < stats.count; ++i)
+            out.println("<li> "+ stats.lengths[i] + " bytes of type " +
+                stats.types[i] + ".  Adaptor name ="+ stats.names[i] +" </li>");
+          out.println("</ol></li>");
+        }
       }
     }
     out.println("</ul>");
@@ -137,7 +145,7 @@
   public void doneWithPost() {
     synchronized(lastPosts) {
       if(lastPosts.size() > CHUNKS_TO_KEEP)
-        lastPosts.remove();
+        lastPosts.removeFirst();
       lastPosts.add(curPost);
     }
   }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java Fri Dec  5 12:30:14 2008
@@ -42,5 +42,6 @@
 
 	
 	public void start();
-  public void shutdown();
+    public void shutdown();
+    public void reloadConfiguration();
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Fri Dec  5 12:30:14 2008
@@ -56,15 +56,18 @@
 
   static Timer statTimer = null;
   static volatile int chunkCount = 0;
-  static final int MAX_EVENTS_PER_POST = 1000;
-  static final int MIN_POST_INTERVAL= 4 * 1000;
+  static final int MAX_SIZE_PER_POST = 2*1024*1024;
+  static final int MIN_POST_INTERVAL= 5 * 1000;
   static ChunkQueue chunkQueue;
   
   ChukwaAgent agent;
   String argDestination = null;
   
   private boolean stopMe = false;
-
+  private boolean reloadConfiguration = false;
+  private Iterator<String> collectors = null;
+  protected ChukwaSender connectorClient = null;
+  
   static{
     statTimer = new Timer();
     chunkQueue = DataFactory.getInstance().getEventQueue();
@@ -97,65 +100,104 @@
 	}
 	
 	public void run(){
-	 	log.info("HttpConnector started at time:" + System.currentTimeMillis());
+		log.info("HttpConnector started at time:" + System.currentTimeMillis());
+
+		Iterator<String> destinations = null;
+
+		// build a list of our destinations from collectors
+		try{
+			destinations = DataFactory.getInstance().getCollectorURLs();
+		} catch (IOException e){
+			log.error("Failed to retreive list of collectors from " +
+					"conf/collectors file", e);
+		}
+
+		connectorClient = new ChukwaHttpSender();
+
+		if (argDestination != null) 
+		{
+			ArrayList<String> tmp = new ArrayList<String>();
+			tmp.add(argDestination);
+			collectors = tmp.iterator();
+			connectorClient.setCollectors(collectors);
+			log.info("using collector specified at agent runtime: " + argDestination);
+		} 
+		else if (destinations != null && destinations.hasNext()) 
+		{
+			collectors = destinations;
+			connectorClient.setCollectors(destinations);
+			log.info("using collectors from collectors file");
+		} 
+		else {
+			log.error("No collectors specified, exiting (and taking agent with us).");
+			agent.shutdown(true);//error is unrecoverable, so stop hard.
+			return;
+		}
+
+		try {
+			long lastPost = System.currentTimeMillis();
+			while(!stopMe) {
+				List<Chunk> newQueue = new ArrayList<Chunk>();
+				try {
+					//get all ready chunks from the chunkQueue to be sent
+					chunkQueue.collect(newQueue,MAX_SIZE_PER_POST); //FIXME: should really do this by size
+
+				} catch(InterruptedException e) {
+					System.out.println("thread interrupted during addChunks(ChunkQueue)");
+					Thread.currentThread().interrupt();
+					break;
+				}
+				int toSend = newQueue.size();
+				List<ChukwaHttpSender.CommitListEntry> results = connectorClient.send(newQueue);
+				log.info("sent " +toSend + " chunks, got back " + results.size() + " acks");
+				//checkpoint the chunks which were committed
+				for(ChukwaHttpSender.CommitListEntry cle : results) {
+					agent.reportCommit(cle.adaptor, cle.uuid);
+					chunkCount++;
+				}
+
+				if (reloadConfiguration)
+				{
+					connectorClient.setCollectors(collectors);
+					log.info("Resetting colectors");
+					reloadConfiguration = false;
+				}
+
+				long now = System.currentTimeMillis();
+				if( now - lastPost < MIN_POST_INTERVAL )  
+					Thread.sleep(now - lastPost);  //wait for stuff to accumulate
+				lastPost = now;
+			} //end of try forever loop
+			log.info("received stop() command so exiting run() loop to shutdown connector");
+		} catch(OutOfMemoryError e) {
+			log.warn("Bailing out",e);
+			System.exit(-1);
+		} catch(InterruptedException e) {
+			//do nothing, let thread die.
+			log.warn("Bailing out",e);
+			System.exit(-1);
+		}catch(java.io.IOException e) {
+			log.error("connector failed; shutting down agent");
+			agent.shutdown(true);
+		}
+	}
 
-	 	Iterator<String> destinations = null;
-	  
+	@Override
+	public void reloadConfiguration()
+	{
+		reloadConfiguration = true;
+		Iterator<String> destinations = null;
+		  
 	 	// build a list of our destinations from collectors
 	 	try{
-	    destinations = DataFactory.getInstance().getCollectors();
+	    destinations = DataFactory.getInstance().getCollectorURLs();
 	  } catch (IOException e){
 	    log.error("Failed to retreive list of collectors from conf/collectors file", e);
 	  }
-	  
-    ChukwaSender connectorClient = new ChukwaHttpSender();
-	  if (argDestination != null) {
-
-	    ArrayList<String> tmp = new ArrayList<String>();
-	    tmp.add(argDestination);
-      connectorClient.setCollectors(tmp.iterator());
-      log.info("using collector specified at agent runtime: " + argDestination);
-    } else if (destinations != null && destinations.hasNext()) {
-      connectorClient.setCollectors(destinations);
-      log.info("using collectors from collectors file");
-	  } else {
-	    log.error("No collectors specified, exiting (and taking agent with us).");
-	    agent.shutdown(true);//error is unrecoverable, so stop hard.
-	    return;
+	  if (destinations != null && destinations.hasNext()) 
+	  {
+		  collectors = destinations;
 	  }
-	  
-	  try {
-	    long lastPost = System.currentTimeMillis();
-  	  while(!stopMe) {
-  	    List<Chunk> newQueue = new ArrayList<Chunk>();
-  	    try {
-  	      //get all ready chunks from the chunkQueue to be sent
-  	      chunkQueue.collect(newQueue,MAX_EVENTS_PER_POST); //FIXME: should really do this by size
-  	     
-  	    } catch(InterruptedException e) {
-  	      System.out.println("thread interrupted during addChunks(ChunkQueue)");
-  	      Thread.currentThread().interrupt();
-  	      break;
-  	    }
-  	    int toSend = newQueue.size();
-  	    List<ChukwaHttpSender.CommitListEntry> results = connectorClient.send(newQueue);
-  	    log.info("sent " +toSend + " chunks, got back " + results.size() + " acks");
-  	    //checkpoint the chunks which were committed
-  	    for(ChukwaHttpSender.CommitListEntry cle : results) {
-          agent.reportCommit(cle.adaptor, cle.uuid);
-          chunkCount++;
-        }
-  	    long now = System.currentTimeMillis();
-  	    if( now - lastPost < MIN_POST_INTERVAL )  
-  	      Thread.sleep(now - lastPost);  //wait for stuff to accumulate
-        lastPost = now;
-  	  } //end of try forever loop
-  	  log.info("received stop() command so exiting run() loop to shutdown connector");
-  	} catch(InterruptedException e) {
-	  //do nothing, let thread die.
-  	}catch(java.io.IOException e) {
-  	  log.error("connector failed; shutting down agent");
-  	  agent.shutdown(true);
-    }
+    
 	}
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java Fri Dec  5 12:30:14 2008
@@ -19,11 +19,24 @@
 package org.apache.hadoop.chukwa.datacollection.controller;
 
 
-import java.net.*;
-import java.io.*;
-import java.util.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.log4j.Logger;
 
 /**
  * A convenience library for applications to communicate to the {@link ChukwaAgent}. Can be used
@@ -31,14 +44,16 @@
  * use for handling log rations.  
  */
 public class ChukwaAgentController {
-  
-  public class AddAdaptorTask extends TimerTask {
-    String adaptorName;
-    String type;
-    String params;
-    long offset;
-    long numRetries;
-    long retryInterval;
+	static Logger log = Logger.getLogger(ChukwaAgentController.class);
+  public class AddAdaptorTask extends TimerTask 
+  {
+	  	
+	    String adaptorName;
+	    String type;
+	    String params;
+	    long offset;
+	    long numRetries;
+	    long retryInterval;
     
     AddAdaptorTask(String adaptorName, String type, String params,
         long offset, long numRetries, long retryInterval){
@@ -50,8 +65,18 @@
       this.retryInterval = retryInterval;
     }
     @Override
-    public void run() {
-      add(adaptorName, type, params, offset, numRetries, retryInterval);
+    public void run() 
+    {
+    	try
+    	{
+    		log.info("Trying to resend the add command [" + adaptorName + "][" + offset + "][" + params +"] [" + numRetries+"]");
+    		add(adaptorName, type, params, offset, numRetries, retryInterval);
+    	}
+    	catch(Exception e)
+    	{
+    		log.warn("Exception in AddAdaptorTask.run", e);
+    		e.printStackTrace();
+    	} 
     }
   }
 
@@ -148,7 +173,7 @@
   Map<Long, ChukwaAgentController.Adaptor> pausedAdaptors;
   String hostname;
   int portno;
-  private Timer addFileTimer = new Timer();
+  
   
   public ChukwaAgentController(){
     portno = DEFAULT_PORT;
@@ -218,6 +243,7 @@
          System.out.println("Scheduling a agent connection retry for adaptor add() in another " +
              retryInterval + " milliseconds, " + numRetries + " retries remaining");
          
+         Timer addFileTimer = new Timer();
          addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params, offset, numRetries-1, retryInterval), retryInterval);
        }
      }else{

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Fri Dec  5 12:30:14 2008
@@ -26,8 +26,10 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpException;
@@ -39,6 +41,7 @@
 import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.log4j.Logger;
@@ -47,22 +50,23 @@
  * Encapsulates all of the http setup and connection details needed for
  * chunks to be delivered to a collector.
  * <p>
- * On error, tries the list of available collectors, pauses for a minute, and then repeats.
+ * On error, tries the list of available collectors, pauses for a minute, 
+ * and then repeats.
  * </p>
  * <p> Will wait forever for collectors to come up. </p>
  */
 public class ChukwaHttpSender implements ChukwaSender{
   static final int MAX_RETRIES_PER_COLLECTOR = 4; //fast retries, in http client
-  static final int SENDER_RETRIES = 3; 
+  static final int SENDER_RETRIES = 14440; 
   static final int WAIT_FOR_COLLECTOR_REBOOT = 20 * 1000; 
     //FIXME: this should really correspond to the timer in RetryListOfCollectors
-  
+  static final int BLACK_LIST_TIME = 300 * 1000;
   static Logger log = Logger.getLogger(ChukwaHttpSender.class);
   static HttpClient client = null;
   static MultiThreadedHttpConnectionManager connectionManager = null;
   static String currCollector = null;
 
-  
+  protected static ConcurrentHashMap<Long, String> blackList = null; 
   protected Iterator<String> collectors;
   
   static
@@ -115,11 +119,11 @@
   }
 
   public ChukwaHttpSender(){
-    //setup default collector
     ArrayList<String> tmp = new ArrayList<String>();
     this.collectors = tmp.iterator();
-    currCollector = "http://localhost:8080";
-    log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
+    ConcurrentHashMap<Long, String> tmpHash = new ConcurrentHashMap<Long, String>();
+    this.blackList = tmpHash;
+    log.info("setting collectors to an empty iterator");
 
   }
   
@@ -135,21 +139,22 @@
    * @param collectors
    */
   public void setCollectors(Iterator<String> collectors){
-    this.collectors = collectors; 
-    //setup a new destination from our list of collectors if one hasn't been set up
+    this.collectors = collectors;
+    this.blackList.clear();
+    //setup a new destination from our list of collectors if one isn't set up
     if (currCollector == null){
       if (collectors.hasNext()){
         currCollector = collectors.next();
       }
       else
-        log.error("No collectors to try in send(), not even trying to do doPost()");
+        log.error("No collectors to try in setCollectors()");
     }
   }
   
   
   /**
-   * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
-   * their size, sets them up 
+   * grab all of the chunks currently in the chunkQueue, stores a copy of them 
+   * locally, calculates their size, sets them up 
    * @return array of chunk id's which were ACKed by collector
    */
   public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
@@ -183,14 +188,29 @@
       //need to pick a destination here
       PostMethod method = new PostMethod();
       try   {
+    	if(blackList.size()!=0) {
+    		for(long time: blackList.keySet()) {
+    			long now = new Date().getTime();
+    			if(now-time > BLACK_LIST_TIME) {
+    	    		log.info(blackList.get(time)+" release from black list.");
+    				blackList.remove(time);
+    			} else if(currCollector.intern()==blackList.get(time)) {
+    				currCollector = collectors.next();
+    			}
+    		}
+    	}
         doPost(method, postData, currCollector);
-
+        // rotate POST to collectors do not work.  All agent and collectors end up spending time to create TCP connections
+        // but unable to send any data.
+        // currCollector = collectors.next();        
         retries = SENDER_RETRIES; //reset count on success
         //if no exception was thrown from doPost, ACK that these chunks were sent
         return commitResults;
       } catch (Throwable e) {
         log.error("Http post exception", e);
         log.info("Checking list of collectors to see if another collector has been specified for rollover");
+        blackList.put(new Date().getTime(), currCollector);
+        log.info("Black list collector: "+currCollector);
         if (collectors.hasNext()){
           currCollector = collectors.next();
           log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
@@ -200,6 +220,9 @@
                 " ms (" + retries + "retries left)");
             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
             retries --;
+            // shuffle the list of collectors if all of them are not available.
+            this.collectors = DataFactory.getInstance().getCollectorURLs();
+            this.blackList.clear();
           } else {
             log.error("No more collectors to try rolling over to; aborting");
             throw new IOException("no collectors");
@@ -211,6 +234,11 @@
         method.releaseConnection();
       }
     } //end retry loop
+    if(currCollector==null) {
+    	// reset the collector list, if ran out of collector to try.
+        this.collectors = DataFactory.getInstance().getCollectorURLs();
+        this.blackList.clear();    	
+    }
     return new ArrayList<CommitListEntry>();
   }
   
@@ -230,25 +258,30 @@
         return !(e instanceof java.net.ConnectException) && (exec < MAX_RETRIES_PER_COLLECTOR);
       }
     });
+    
+    pars.setParameter(HttpMethodParams.SO_TIMEOUT , new Integer(30000));
+    
+    
+    
     method.setParams(pars);
     method.setPath(dest);
     
      //send it across the network
     method.setRequestEntity(data);
     
-    log.info("HTTP post to " + dest+" length = "+ data.getContentLength());
+    log.info(">>>>>> HTTP post to " + dest+" length = "+ data.getContentLength());
     // Send POST request
     
-    client.setTimeout(8000);
+    //client.setTimeout(15*1000);
     int statusCode = client.executeMethod(method);
       
     if (statusCode != HttpStatus.SC_OK)  {
-      log.error("HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine());
+      log.error(">>>>>> HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine());
       //do something aggressive here
       throw new HttpException("got back a failure from server");
     }
     //implicitly "else"
-    log.info("got success back from the remote collector; response length "+ method.getResponseContentLength());
+    log.info(">>>>>> HTTP Got success back from the remote collector; response length "+ method.getResponseContentLength());
 
       //FIXME: should parse acks here
     InputStream rstream = null;



Mime
View raw message